1use super::base::{parse_identifier, parse_operator, parse_value};
10use super::expressions::parse_expression;
11use crate::ast::*;
12use nom::{
13 IResult, Parser,
14 branch::alt,
15 bytes::complete::{tag, tag_no_case},
16 character::complete::{char, multispace0, multispace1},
17 combinator::{map, opt, peek},
18 multi::separated_list0,
19 sequence::preceded,
20};
21
22pub fn parse_function_or_aggregate(input: &str) -> IResult<&str, Expr> {
24 let (input, name) = parse_identifier(input)?;
26 let (input, _) = multispace0(input)?;
27 let (input, _) = char('(').parse(input)?;
28 let (input, _) = multispace0(input)?;
29
30 let (input, has_distinct) = opt((tag_no_case("distinct"), multispace1)).parse(input)?;
31 let distinct = has_distinct.is_some();
32
33 let (input, args) =
34 separated_list0((multispace0, char(','), multispace0), parse_function_arg).parse(input)?;
35
36 let (input, _) = multispace0(input)?;
37 let (input, _) = char(')').parse(input)?;
38 let (input, _) = multispace0(input)?;
39
40 let (input, filter_clause) = opt(parse_filter_clause).parse(input)?;
41
42 let (input, _) = multispace0(input)?;
43 if let Ok((remaining, _)) = tag_no_case::<_, _, nom::error::Error<&str>>("over").parse(input) {
44 let (remaining, _) = multispace0(remaining)?;
45 let (remaining, _) = char('(').parse(remaining)?;
46 let (remaining, _) = multispace0(remaining)?;
47
48 let (remaining, partition) = opt(parse_partition_by).parse(remaining)?;
49 let partition = partition.unwrap_or_default();
50 let (remaining, _) = multispace0(remaining)?;
51
52 let (remaining, order) = opt(parse_window_order_by).parse(remaining)?;
53 let order = order.unwrap_or_default();
54 let (remaining, _) = multispace0(remaining)?;
55
56 let (remaining, frame) = opt(parse_window_frame).parse(remaining)?;
58 let (remaining, _) = multispace0(remaining)?;
59
60 let (remaining, _) = char(')').parse(remaining)?;
62 let (remaining, _) = multispace0(remaining)?;
63
64 let (remaining, alias) = opt(preceded(
66 (multispace0, tag_no_case("as"), multispace1),
67 parse_identifier,
68 ))
69 .parse(remaining)?;
70 let alias_str = alias
71 .map(|s| s.to_string())
72 .unwrap_or_else(|| name.to_string());
73
74 let params: Vec<Value> = args
75 .iter()
76 .map(|e| Value::Function(e.to_string()))
77 .collect();
78
79 return Ok((
80 remaining,
81 Expr::Window {
82 name: alias_str,
83 func: name.to_string(),
84 params,
85 partition,
86 order,
87 frame,
88 },
89 ));
90 }
91
92 let (input, alias) = opt(preceded(
94 (multispace0, tag_no_case("as"), multispace1),
95 parse_identifier,
96 ))
97 .parse(input)?;
98 let alias = alias.map(|s| s.to_string());
99
100 let name_lower = name.to_lowercase();
101 match name_lower.as_str() {
102 "count" | "sum" | "avg" | "min" | "max" => {
103 let col = args
105 .first()
106 .map(|e| e.to_string())
107 .unwrap_or_else(|| "*".to_string());
108 let func = match name_lower.as_str() {
109 "count" => AggregateFunc::Count,
110 "sum" => AggregateFunc::Sum,
111 "avg" => AggregateFunc::Avg,
112 "min" => AggregateFunc::Min,
113 "max" => AggregateFunc::Max,
114 _ => AggregateFunc::Count, };
116 Ok((
117 input,
118 Expr::Aggregate {
119 col,
120 func,
121 distinct,
122 filter: filter_clause,
123 alias,
124 },
125 ))
126 }
127 _ => Ok((
128 input,
129 Expr::FunctionCall {
130 name: name.to_string(),
131 args,
132 alias,
133 },
134 )),
135 }
136}
137
138pub fn parse_function_arg(input: &str) -> IResult<&str, Expr> {
140 alt((map(tag("*"), |_| Expr::Star), parse_expression)).parse(input)
141}
142
143fn parse_filter_clause(input: &str) -> IResult<&str, Vec<Condition>> {
145 let (input, _) = tag_no_case("filter").parse(input)?;
146 let (input, _) = multispace0(input)?;
147 let (input, _) = char('(').parse(input)?;
148 let (input, _) = multispace0(input)?;
149 let (input, _) = tag_no_case("where").parse(input)?;
150 let (input, _) = multispace1(input)?;
151
152 let (input, conditions) = parse_filter_conditions(input)?;
153
154 let (input, _) = multispace0(input)?;
155 let (input, _) = char(')').parse(input)?;
156
157 Ok((input, conditions))
158}
159
160fn parse_filter_conditions(input: &str) -> IResult<&str, Vec<Condition>> {
162 let mut conditions = Vec::new();
163 let mut current_input = input;
164
165 loop {
166 let (input, _) = multispace0(current_input)?;
167 let (input, col) = parse_identifier(input)?;
168 let (input, _) = multispace0(input)?;
169 let (input, op) = parse_operator(input)?;
170 let (input, _) = multispace0(input)?;
171
172 let (input, value) = if matches!(op, Operator::IsNull | Operator::IsNotNull) {
174 (input, Value::Null)
175 } else if matches!(op, Operator::In | Operator::NotIn) {
176 let (input, _) = char('(').parse(input)?;
177 let (input, _) = multispace0(input)?;
178 let (input, values) =
179 separated_list0((multispace0, char(','), multispace0), parse_value).parse(input)?;
180 let (input, _) = multispace0(input)?;
181 let (input, _) = char(')').parse(input)?;
182 (input, Value::Array(values))
183 } else if matches!(op, Operator::Between | Operator::NotBetween) {
184 let (input, min_val) = parse_value(input)?;
185 let (input, _) = multispace1(input)?;
186 let (input, _) = tag_no_case("and").parse(input)?;
187 let (input, _) = multispace1(input)?;
188 let (input, max_val) = parse_value(input)?;
189 (input, Value::Array(vec![min_val, max_val]))
191 } else {
192 parse_filter_value(input)?
194 };
195
196 conditions.push(Condition {
197 left: Expr::Named(col.to_string()),
198 op,
199 value,
200 is_array_unnest: false,
201 });
202
203 current_input = input;
204
205 let and_result: IResult<&str, _> = preceded(
206 (multispace0, tag_no_case("and"), multispace1),
207 peek(parse_identifier),
208 )
209 .parse(current_input);
210
211 if let Ok((_next_input, _)) = and_result {
212 let (next_input, _) = multispace0(current_input)?;
214 let (next_input, _) = tag_no_case("and").parse(next_input)?;
215 let (next_input, _) = multispace1(next_input)?;
216 current_input = next_input;
217 } else {
218 break;
219 }
220 }
221
222 Ok((current_input, conditions))
223}
224
225fn parse_filter_value(input: &str) -> IResult<&str, Value> {
228 if let Ok((remaining, val)) = parse_value(input) {
231 if let Value::Interval { amount, unit } = val {
233 return Ok((
234 remaining,
235 Value::Function(format!("INTERVAL '{} {}'", amount, unit)),
236 ));
237 }
238
239 if !matches!(val, Value::Function(_)) {
241 return Ok((remaining, val));
242 }
243 }
244
245 let mut end_pos = 0;
249 let mut paren_depth = 0;
250
251 for (i, c) in input.char_indices() {
252 match c {
253 '(' => paren_depth += 1,
254 ')' => {
255 if paren_depth == 0 {
256 end_pos = i;
257 break;
258 }
259 paren_depth -= 1;
260 }
261 _ => {}
262 }
263
264 if paren_depth == 0 && i > 0 {
265 let remaining = &input[i..];
266 if remaining.len() >= 4 {
267 let potential_and = &remaining[..4].to_lowercase();
268 if potential_and.starts_with("and ")
269 || potential_and.starts_with("and\t")
270 || potential_and.starts_with("and\n")
271 {
272 end_pos = i;
273 break;
274 }
275 }
276 }
277 }
278
279 if end_pos == 0 {
280 end_pos = input.len();
281 }
282
283 let expr_str = input[..end_pos].trim();
284 if expr_str.is_empty() {
285 return Err(nom::Err::Error(nom::error::Error::new(
286 input,
287 nom::error::ErrorKind::TakeWhile1,
288 )));
289 }
290
291 if let Ok((_, expr)) = parse_expression(expr_str) {
293 return Ok((&input[end_pos..], Value::Function(expr.to_string())));
294 }
295
296 Ok((&input[end_pos..], Value::Function(expr_str.to_string())))
297}
298
299fn parse_partition_by(input: &str) -> IResult<&str, Vec<String>> {
301 let (input, _) = tag_no_case("partition").parse(input)?;
302 let (input, _) = multispace1(input)?;
303 let (input, _) = tag_no_case("by").parse(input)?;
304 let (input, _) = multispace1(input)?;
305
306 let (input, cols) =
307 separated_list0((multispace0, char(','), multispace0), parse_identifier).parse(input)?;
308
309 Ok((input, cols.into_iter().map(|s| s.to_string()).collect()))
310}
311
312fn parse_window_order_by(input: &str) -> IResult<&str, Vec<Cage>> {
314 let (input, _) = tag_no_case("order").parse(input)?;
315 let (input, _) = multispace1(input)?;
316 let (input, _) = tag_no_case("by").parse(input)?;
317 let (input, _) = multispace1(input)?;
318
319 let (input, order_parts) = separated_list0(
320 (multispace0, char(','), multispace0),
321 parse_window_sort_item,
322 )
323 .parse(input)?;
324
325 Ok((input, order_parts))
326}
327
328fn parse_window_sort_item(input: &str) -> IResult<&str, Cage> {
330 use nom::combinator::value;
331
332 let (input, col) = parse_identifier(input)?;
333 let (input, _) = multispace0(input)?;
334
335 let (input, order) = opt(alt((
336 value(SortOrder::Desc, tag_no_case("desc")),
337 value(SortOrder::Asc, tag_no_case("asc")),
338 )))
339 .parse(input)?;
340
341 Ok((
342 input,
343 Cage {
344 kind: CageKind::Sort(order.unwrap_or(SortOrder::Asc)),
345 conditions: vec![Condition {
346 left: Expr::Named(col.to_string()),
347 op: Operator::Eq,
348 value: Value::Null,
349 is_array_unnest: false,
350 }],
351 logical_op: LogicalOp::And,
352 },
353 ))
354}
355
356fn parse_window_frame(input: &str) -> IResult<&str, WindowFrame> {
358 use nom::combinator::value;
359
360 let (input, is_rows) = alt((
362 value(true, tag_no_case("rows")),
363 value(false, tag_no_case("range")),
364 )).parse(input)?;
365 let (input, _) = multispace1(input)?;
366 let (input, _) = tag_no_case("between").parse(input)?;
367 let (input, _) = multispace1(input)?;
368
369 let (input, start) = parse_frame_bound(input)?;
371 let (input, _) = multispace1(input)?;
372 let (input, _) = tag_no_case("and").parse(input)?;
373 let (input, _) = multispace1(input)?;
374
375 let (input, end) = parse_frame_bound(input)?;
377
378 let frame = if is_rows {
379 WindowFrame::Rows { start, end }
380 } else {
381 WindowFrame::Range { start, end }
382 };
383
384 Ok((input, frame))
385}
386
387fn parse_frame_bound(input: &str) -> IResult<&str, FrameBound> {
389 use nom::combinator::value;
390 use nom::character::complete::i32 as parse_i32;
391
392 alt((
393 value(FrameBound::UnboundedPreceding,
395 (tag_no_case("unbounded"), multispace1, tag_no_case("preceding"))),
396 value(FrameBound::UnboundedFollowing,
398 (tag_no_case("unbounded"), multispace1, tag_no_case("following"))),
399 value(FrameBound::CurrentRow,
401 (tag_no_case("current"), multispace1, tag_no_case("row"))),
402 map((parse_i32, multispace1, tag_no_case("preceding")),
404 |(n, _, _)| FrameBound::Preceding(n)),
405 map((parse_i32, multispace1, tag_no_case("following")),
407 |(n, _, _)| FrameBound::Following(n)),
408 )).parse(input)
409}