Skip to main content

reddb_rql/parser/
table.rs

1//! Table query parsing (SELECT ... FROM ...)
2
3use super::error::ParseError;
4use crate::ast::{
5    BinOp, CompareOp, Expr, FieldRef, Filter, OrderByClause, Projection, QueryExpr,
6    QueueSelectQuery, SelectItem, Span, TableQuery, UnaryOp,
7};
8use crate::lexer::Token;
9use crate::sql_lowering::{expr_to_projection, filter_to_expr, select_item_to_projection};
10use reddb_types::types::Value;
11
12fn is_scalar_function(name: &str) -> bool {
13    matches!(
14        name,
15        "GEO_DISTANCE"
16            | "GEO_DISTANCE_VINCENTY"
17            | "GEO_BEARING"
18            | "GEO_MIDPOINT"
19            | "HAVERSINE"
20            | "VINCENTY"
21            | "TIME_BUCKET"
22            | "UPPER"
23            | "LOWER"
24            | "LENGTH"
25            | "CHAR_LENGTH"
26            | "CHARACTER_LENGTH"
27            | "OCTET_LENGTH"
28            | "BIT_LENGTH"
29            | "SUBSTRING"
30            | "SUBSTR"
31            | "POSITION"
32            | "TRIM"
33            | "LTRIM"
34            | "RTRIM"
35            | "BTRIM"
36            | "CONCAT"
37            | "CONCAT_WS"
38            | "REVERSE"
39            | "LEFT"
40            | "RIGHT"
41            | "QUOTE_LITERAL"
42            | "ABS"
43            | "ROUND"
44            | "COALESCE"
45            | "STDDEV"
46            | "VARIANCE"
47            | "MEDIAN"
48            | "PERCENTILE"
49            | "GROUP_CONCAT"
50            | "STRING_AGG"
51            | "FIRST"
52            | "LAST"
53            | "ARRAY_AGG"
54            | "COUNT_DISTINCT"
55            | "MONEY"
56            | "MONEY_ASSET"
57            | "MONEY_MINOR"
58            | "MONEY_SCALE"
59            | "VERIFY_PASSWORD"
60            | "CAST"
61            | "CASE"
62    )
63}
64
65fn is_aggregate_function(name: &str) -> bool {
66    matches!(
67        name,
68        "COUNT"
69            | "AVG"
70            | "SUM"
71            | "MIN"
72            | "MAX"
73            | "STDDEV"
74            | "VARIANCE"
75            | "MEDIAN"
76            | "PERCENTILE"
77            | "GROUP_CONCAT"
78            | "STRING_AGG"
79            | "FIRST"
80            | "LAST"
81            | "ARRAY_AGG"
82            | "COUNT_DISTINCT"
83    )
84}
85
86fn aggregate_token_name(token: &Token) -> Option<&'static str> {
87    match token {
88        Token::Count => Some("COUNT"),
89        Token::Sum => Some("SUM"),
90        Token::Avg => Some("AVG"),
91        Token::Min => Some("MIN"),
92        Token::Max => Some("MAX"),
93        Token::First => Some("FIRST"),
94        Token::Last => Some("LAST"),
95        _ => None,
96    }
97}
98
99fn scalar_token_name(token: &Token) -> Option<&'static str> {
100    match token {
101        Token::Left => Some("LEFT"),
102        Token::Right => Some("RIGHT"),
103        _ => None,
104    }
105}
106use super::Parser;
107
108impl<'a> Parser<'a> {
109    /// Parse SELECT ... FROM ... query
110    pub fn parse_select_query(&mut self) -> Result<QueryExpr, ParseError> {
111        // Recursion guard: nested subqueries (UNION, derived tables,
112        // EXISTS) re-enter through this point, so depth here bounds
113        // the SELECT-shaped recursion in addition to the expr Pratt
114        // climb guarded in `parse_expr_prec`.
115        self.enter_depth()?;
116        let result = self.parse_select_query_inner();
117        self.exit_depth();
118        result
119    }
120
121    /// Parse the comma-separated argument list of a table-valued function
122    /// call. The opening `(` has already been consumed; the caller consumes
123    /// the closing `)`. Requires at least one argument and rejects malformed
124    /// forms (issue #795).
125    ///
126    /// Three argument shapes are accepted (issues #796 / #799):
127    /// - positional identifiers, e.g. the graph collection `g`;
128    /// - named numeric arguments `key => <number>`, e.g. `resolution => 0.5`;
129    /// - named subquery arguments `key => (<SELECT …>)`, e.g.
130    ///   `nodes => (SELECT id FROM hosts)` (the inline-graph form).
131    ///
132    /// Positional arguments must precede named arguments; a positional
133    /// argument after any named one is a clear error. Returns the positional
134    /// identifiers, the named numeric `(key, value)` pairs, and the named
135    /// subquery `(key, query)` pairs, each in source order.
136    #[allow(clippy::type_complexity)]
137    fn parse_table_function_args(
138        &mut self,
139        name: &str,
140    ) -> Result<(Vec<String>, Vec<(String, f64)>, Vec<(String, QueryExpr)>), ParseError> {
141        // Zero-argument form `name()` is rejected with a clear message.
142        if matches!(self.peek(), Token::RParen) {
143            return Err(ParseError::new(
144                format!("table function '{name}' requires at least one argument"),
145                self.position(),
146            ));
147        }
148
149        let mut args = Vec::new();
150        let mut named_args = Vec::new();
151        let mut subquery_args: Vec<(String, QueryExpr)> = Vec::new();
152        loop {
153            // Each argument starts with an identifier (the positional value or
154            // the named-argument key). A handful of named-argument keys lex as
155            // reserved keywords (e.g. `max_iterations`); accept those here so
156            // the centrality TVFs (issue #797) can name them, mapping the token
157            // back to its lowercase identifier spelling.
158            let ident = match self.advance()? {
159                Token::Ident(arg) => arg,
160                Token::MaxIterations => "max_iterations".to_string(),
161                other => {
162                    return Err(ParseError::expected(
163                        vec!["table function argument identifier"],
164                        &other,
165                        self.position(),
166                    ));
167                }
168            };
169
170            // `ident => (SELECT …)` is a named subquery argument;
171            // `ident => <number>` is a named numeric argument; otherwise it is
172            // a bare positional identifier.
173            if matches!(self.peek(), Token::FatArrow) {
174                self.advance()?; // consume '=>'
175                if matches!(self.peek(), Token::LParen) {
176                    self.advance()?; // consume '('
177                    if !self.check(&Token::Select) {
178                        let found = self.peek().clone();
179                        return Err(ParseError::expected(
180                            vec!["SELECT subquery"],
181                            &found,
182                            self.position(),
183                        ));
184                    }
185                    let query = self.parse_select_query()?;
186                    self.expect(Token::RParen)?;
187                    subquery_args.push((ident, query));
188                } else {
189                    let value = self.parse_float()?;
190                    named_args.push((ident, value));
191                }
192            } else if named_args.is_empty() && subquery_args.is_empty() {
193                args.push(ident);
194            } else {
195                return Err(ParseError::new(
196                    format!(
197                        "table function '{name}' positional argument '{ident}' cannot follow a named argument"
198                    ),
199                    self.position(),
200                ));
201            }
202
203            // A comma continues the list; a `)` ends it (consumed by caller).
204            // Anything else is a clear error (e.g. a missing closing paren).
205            match self.peek() {
206                Token::Comma => {
207                    self.advance()?;
208                    continue;
209                }
210                Token::RParen => break,
211                _ => {
212                    let found = self.peek().clone();
213                    return Err(ParseError::expected(
214                        vec!["','", "')'"],
215                        &found,
216                        self.position(),
217                    ));
218                }
219            }
220        }
221        Ok((args, named_args, subquery_args))
222    }
223
224    /// Build the `TableSource` for a table-valued function call from its
225    /// parsed argument lists. When `subquery_args` is non-empty the call uses
226    /// the inline-graph form (`nodes => / edges =>`); otherwise it is the
227    /// graph-collection form (issue #799).
228    fn build_table_function_source(
229        &self,
230        name: String,
231        args: Vec<String>,
232        named_args: Vec<(String, f64)>,
233        subquery_args: Vec<(String, QueryExpr)>,
234    ) -> Result<crate::ast::TableSource, ParseError> {
235        use crate::ast::TableSource;
236
237        if subquery_args.is_empty() {
238            return Ok(TableSource::Function {
239                name,
240                args,
241                named_args,
242            });
243        }
244
245        // Inline-graph form: exactly one `nodes` and one `edges` subquery, no
246        // positional graph-collection argument.
247        if !args.is_empty() {
248            return Err(ParseError::new(
249                format!(
250                    "table function '{name}' inline form takes no positional graph argument; pass `nodes => (…), edges => (…)`"
251                ),
252                self.position(),
253            ));
254        }
255
256        let mut nodes: Option<QueryExpr> = None;
257        let mut edges: Option<QueryExpr> = None;
258        for (key, query) in subquery_args {
259            if key.eq_ignore_ascii_case("nodes") {
260                if nodes.is_some() {
261                    return Err(ParseError::new(
262                        format!(
263                            "table function '{name}' has a duplicate 'nodes' subquery argument"
264                        ),
265                        self.position(),
266                    ));
267                }
268                nodes = Some(query);
269            } else if key.eq_ignore_ascii_case("edges") {
270                if edges.is_some() {
271                    return Err(ParseError::new(
272                        format!(
273                            "table function '{name}' has a duplicate 'edges' subquery argument"
274                        ),
275                        self.position(),
276                    ));
277                }
278                edges = Some(query);
279            } else {
280                return Err(ParseError::new(
281                    format!(
282                        "table function '{name}' has no subquery argument '{key}' (expected 'nodes' or 'edges')"
283                    ),
284                    self.position(),
285                ));
286            }
287        }
288
289        let (Some(nodes), Some(edges)) = (nodes, edges) else {
290            return Err(ParseError::new(
291                format!(
292                    "table function '{name}' inline form requires both `nodes => (…)` and `edges => (…)` subqueries"
293                ),
294                self.position(),
295            ));
296        };
297
298        Ok(TableSource::InlineGraphFunction {
299            name,
300            nodes: Box::new(nodes),
301            edges: Box::new(edges),
302            named_args,
303        })
304    }
305
306    /// Read one segment of a dotted table name. The graph analytics outputs
307    /// `components` and `centrality` lex as reserved keywords, so they are
308    /// normalised back to their lowercase spelling here; every other segment
309    /// (e.g. `communities`, or virtual-schema suffixes like `collections`) is
310    /// an ordinary identifier. Issue #800.
311    fn parse_table_name_segment(&mut self) -> Result<String, ParseError> {
312        match self.peek() {
313            Token::Components => {
314                self.advance()?;
315                Ok("components".to_string())
316            }
317            Token::Centrality => {
318                self.advance()?;
319                Ok("centrality".to_string())
320            }
321            _ => self.expect_ident(),
322        }
323    }
324
325    fn parse_select_query_inner(&mut self) -> Result<QueryExpr, ParseError> {
326        self.expect(Token::Select)?;
327
328        // `SELECT DISTINCT <projection>` — the projection-level quantifier
329        // (issue #1126). Detected immediately after SELECT, before the
330        // projection list, so it never collides with the aggregate-argument
331        // form `COUNT(DISTINCT x)`, which is parsed inside the call args.
332        let distinct = self.consume(&Token::Distinct)?;
333
334        // Parse column list
335        let (select_items, columns) = self.parse_select_items_and_projections()?;
336
337        // Parse optional table source. If omitted, default to `ANY` so the query
338        // can return mixed entities (table, document, graph, and vector) by default.
339        let has_from = self.consume(&Token::From)?;
340        // Optional structured FROM source. Currently populated only for
341        // table-valued function calls such as `components(g)` (issue #795);
342        // plain tables leave this `None` and rely on the legacy `table` slot.
343        let mut table_source: Option<crate::ast::TableSource> = None;
344        let mut from_subquery: Option<QueryExpr> = None;
345        let table = if has_from {
346            if self.consume(&Token::Queue)? {
347                let queue = self.expect_ident()?;
348                let filter = if self.consume(&Token::Where)? {
349                    Some(self.parse_filter()?)
350                } else {
351                    None
352                };
353                let limit = if self.consume(&Token::Limit)? {
354                    Some(self.parse_integer()? as u64)
355                } else {
356                    None
357                };
358                return Ok(QueryExpr::QueueSelect(QueueSelectQuery {
359                    queue,
360                    columns: queue_projection_columns(&columns)?,
361                    filter,
362                    limit,
363                }));
364            } else if self.check(&Token::LParen) {
365                self.advance()?; // consume '('
366                if !self.check(&Token::Select) {
367                    return Err(ParseError::new(
368                        "subquery in FROM must start with SELECT".to_string(),
369                        self.position(),
370                    ));
371                }
372                from_subquery = Some(self.parse_select_query()?);
373                self.expect(Token::RParen)?;
374                "__subq_pending".to_string()
375            } else if self.consume(&Token::Star)? {
376                "*".to_string()
377            } else if self.consume(&Token::All)? {
378                "all".to_string()
379            } else if matches!(self.peek(), Token::Components) {
380                // `components` lexes as the graph-analytics keyword
381                // `Token::Components`, so it never reaches `expect_ident`.
382                // In FROM position it is the connected-components
383                // table-valued function: `FROM components(g)` (issue #795).
384                self.advance()?; // consume COMPONENTS
385                let name = "components".to_string();
386                self.expect(Token::LParen)?;
387                let (args, named_args, subquery_args) = self.parse_table_function_args(&name)?;
388                self.expect(Token::RParen)?;
389                table_source = Some(self.build_table_function_source(
390                    name.clone(),
391                    args,
392                    named_args,
393                    subquery_args,
394                )?);
395                name
396            } else if matches!(self.peek(), Token::ShortestPath) {
397                // `shortest_path` lexes as the graph-analytics keyword
398                // `Token::ShortestPath`, so it never reaches `expect_ident`.
399                // In FROM position it is the shortest-path table-valued
400                // function: `FROM shortest_path(g, src => .., dst => ..)`
401                // (issue #798). The bare `SHORTEST_PATH ... FROM ... TO ...`
402                // graph-command form is dispatched separately at statement
403                // start, so the two grammars never collide.
404                self.advance()?; // consume SHORTEST_PATH
405                let name = "shortest_path".to_string();
406                self.expect(Token::LParen)?;
407                let (args, named_args, subquery_args) = self.parse_table_function_args(&name)?;
408                self.expect(Token::RParen)?;
409                table_source = Some(self.build_table_function_source(
410                    name.clone(),
411                    args,
412                    named_args,
413                    subquery_args,
414                )?);
415                name
416            } else {
417                let ident = self.expect_ident()?;
418                // Table-valued function call: `ident(arg, ...)` (issue #795).
419                if matches!(self.peek(), Token::LParen) {
420                    self.advance()?; // consume '('
421                    let (args, named_args, subquery_args) =
422                        self.parse_table_function_args(&ident)?;
423                    self.expect(Token::RParen)?;
424                    table_source = Some(self.build_table_function_source(
425                        ident.clone(),
426                        args,
427                        named_args,
428                        subquery_args,
429                    )?);
430                    ident
431                } else {
432                    // Dotted table name, e.g. the `<graph>.<output>` analytics
433                    // virtual view `g.communities` (issue #800) or a schema-
434                    // qualified virtual table such as `red.collections`. The
435                    // dotted form is kept verbatim in `table`; the runtime
436                    // resolves a real collection of that exact name first, then
437                    // falls back to analytics-view resolution.
438                    let mut name = ident;
439                    while matches!(self.peek(), Token::Dot) {
440                        self.advance()?; // consume '.'
441                        let segment = self.parse_table_name_segment()?;
442                        name.push('.');
443                        name.push_str(&segment);
444                    }
445                    name
446                }
447            }
448        } else {
449            "any".to_string()
450        };
451
452        // Parse optional alias (only when a FROM clause exists).
453        // `AS OF` is a clause — don't gobble the `AS` as an alias
454        // marker when the following token is `OF`.
455        let alias =
456            if !has_from || (self.check(&Token::As) && matches!(self.peek_next()?, Token::Of)) {
457                None
458            } else if self.consume(&Token::As)?
459                || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
460            {
461                Some(self.expect_ident()?)
462            } else {
463                None
464            };
465
466        let mut query = if let Some(inner) = from_subquery {
467            let mut query = TableQuery::from_subquery(inner, alias);
468            query.select_items = select_items;
469            query.columns = columns;
470            query.distinct = distinct;
471            query
472        } else {
473            TableQuery {
474                table,
475                source: table_source,
476                alias,
477                select_items,
478                columns,
479                where_expr: None,
480                filter: None,
481                group_by_exprs: Vec::new(),
482                group_by: Vec::new(),
483                having_expr: None,
484                having: None,
485                order_by: Vec::new(),
486                limit: None,
487                limit_param: None,
488                offset: None,
489                offset_param: None,
490                expand: None,
491                as_of: None,
492                sessionize: None,
493                distinct,
494            }
495        };
496
497        if self.is_join_keyword() {
498            let return_items = std::mem::take(&mut query.select_items);
499            let return_ = std::mem::take(&mut query.columns);
500            let mut expr = self.parse_join_query(QueryExpr::Table(query))?;
501            if let QueryExpr::Join(join) = &mut expr {
502                join.return_items = return_items;
503                join.return_ = return_;
504            }
505            return Ok(expr);
506        }
507
508        // SESSIONIZE BY <ident> GAP <duration> [ORDER BY <ident>]
509        // — issue #585 slice 8. Parsed before WHERE/GROUP BY so the
510        // optional inner ORDER BY (which the user binds to the
511        // operator's timestamp axis) cannot be confused with the
512        // SELECT's top-level ORDER BY further down. Both `BY` and
513        // `GAP` may be omitted when the source collection's
514        // descriptor carries `SESSION_KEY` / `SESSION_GAP` defaults
515        // (slice 1) — the executor resolves them at run time and
516        // raises `MissingSessionKey` if neither side supplies a
517        // value.
518        if self.consume(&Token::Sessionize)? {
519            query.sessionize = Some(self.parse_sessionize_clause()?);
520        }
521
522        // Parse optional clauses
523        self.parse_table_clauses(&mut query)?;
524
525        Ok(QueryExpr::Table(query))
526    }
527
528    fn parse_sessionize_clause(&mut self) -> Result<crate::ast::SessionizeClause, ParseError> {
529        use crate::ast::SessionizeClause;
530
531        let mut clause = SessionizeClause::default();
532
533        if self.consume(&Token::By)? {
534            clause.actor_col = Some(self.expect_ident()?);
535        }
536        if self.consume(&Token::Gap)? {
537            let value = self.parse_float()?;
538            let unit = self.parse_duration_unit()?;
539            clause.gap_ms = Some((value * unit) as u64);
540        }
541        // Optional `ORDER BY <ident>` immediately after GAP. The
542        // top-level SELECT ORDER BY parsed by `parse_table_clauses`
543        // sees the next ORDER token, so this only consumes the one
544        // immediately attached to SESSIONIZE.
545        if self.consume(&Token::Order)? {
546            self.expect(Token::By)?;
547            clause.order_col = Some(self.expect_ident()?);
548        }
549        Ok(clause)
550    }
551}
552
553impl<'a> Parser<'a> {
554    /// Check if current identifier is a clause keyword
555    pub fn is_clause_keyword(&self) -> bool {
556        matches!(
557            self.peek(),
558            Token::Where
559                | Token::Order
560                | Token::Limit
561                | Token::Offset
562                | Token::Join
563                | Token::Inner
564                | Token::Left
565                | Token::Right
566                | Token::As
567                | Token::Sessionize
568        )
569    }
570
571    /// Parse projection list (column selections)
572    pub fn parse_projection_list(&mut self) -> Result<Vec<Projection>, ParseError> {
573        Ok(self.parse_select_items_and_projections()?.1)
574    }
575
576    pub(crate) fn parse_select_items_and_projections(
577        &mut self,
578    ) -> Result<(Vec<SelectItem>, Vec<Projection>), ParseError> {
579        // Handle SELECT *
580        if self.consume(&Token::Star)? {
581            return Ok((vec![SelectItem::Wildcard], Vec::new())); // Empty legacy vec means all columns
582        }
583
584        let mut select_items = Vec::new();
585        let mut projections = Vec::new();
586        loop {
587            let (item, proj) = self.parse_projection()?;
588            select_items.push(item);
589            projections.push(proj);
590
591            if !self.consume(&Token::Comma)? {
592                break;
593            }
594        }
595        Ok((select_items, projections))
596    }
597
598    /// Parse a single projection — supports columns, aggregate functions, and scalar functions
599    fn parse_projection(&mut self) -> Result<(SelectItem, Projection), ParseError> {
600        let expr = self.parse_expr()?;
601        if contains_nested_aggregate(&expr) && !is_plain_aggregate_expr(&expr) {
602            return Err(ParseError::new(
603                "aggregate function is not valid inside another expression".to_string(),
604                self.position(),
605            ));
606        }
607        let alias = if self.consume(&Token::As)? {
608            Some(self.expect_column_ident()?)
609        } else {
610            None
611        };
612        let select_item = SelectItem::Expr {
613            expr: expr.clone(),
614            alias: alias.clone(),
615        };
616        let projection = select_item_to_projection(&select_item).ok_or_else(|| {
617            ParseError::new(
618                "projection cannot yet be lowered to legacy runtime representation".to_string(),
619                self.position(),
620            )
621        })?;
622        Ok((select_item, projection))
623    }
624}
625
626fn contains_nested_aggregate(expr: &Expr) -> bool {
627    match expr {
628        Expr::FunctionCall { name, args, .. } => {
629            is_aggregate_function(&name.to_uppercase())
630                || args.iter().any(contains_nested_aggregate)
631        }
632        // Issue #589 slice 7a: a window function aggregate (e.g.
633        // `SUM(x) OVER (...)`) is NOT a plain aggregate from the
634        // group-by analyser's point of view — it operates over a
635        // partitioned window, not a GROUP BY group. We still recurse
636        // into args / partition / order keys so a *nested* aggregate
637        // (e.g. `SUM(COUNT(*) OVER ()) OVER (...)`) is caught.
638        Expr::WindowFunctionCall { args, window, .. } => {
639            args.iter().any(contains_nested_aggregate)
640                || window.partition_by.iter().any(contains_nested_aggregate)
641                || window
642                    .order_by
643                    .iter()
644                    .any(|o| contains_nested_aggregate(&o.expr))
645        }
646        Expr::BinaryOp { lhs, rhs, .. } => {
647            contains_nested_aggregate(lhs) || contains_nested_aggregate(rhs)
648        }
649        Expr::UnaryOp { operand, .. } | Expr::IsNull { operand, .. } => {
650            contains_nested_aggregate(operand)
651        }
652        Expr::Cast { inner, .. } => contains_nested_aggregate(inner),
653        Expr::Case {
654            branches, else_, ..
655        } => {
656            branches.iter().any(|(cond, value)| {
657                contains_nested_aggregate(cond) || contains_nested_aggregate(value)
658            }) || else_.as_deref().is_some_and(contains_nested_aggregate)
659        }
660        Expr::InList { target, values, .. } => {
661            contains_nested_aggregate(target) || values.iter().any(contains_nested_aggregate)
662        }
663        Expr::Between {
664            target, low, high, ..
665        } => {
666            contains_nested_aggregate(target)
667                || contains_nested_aggregate(low)
668                || contains_nested_aggregate(high)
669        }
670        Expr::Literal { .. }
671        | Expr::Column { .. }
672        | Expr::Parameter { .. }
673        | Expr::Subquery { .. } => false,
674    }
675}
676
677fn is_plain_aggregate_expr(expr: &Expr) -> bool {
678    match expr {
679        Expr::FunctionCall { name, args, .. } if is_aggregate_function(&name.to_uppercase()) => {
680            !args.iter().any(contains_nested_aggregate)
681        }
682        _ => false,
683    }
684}
685
686fn attach_projection_alias(proj: Projection, alias: Option<String>) -> Projection {
687    let Some(alias) = alias else { return proj };
688    match proj {
689        Projection::Field(field, _) => Projection::Field(field, Some(alias)),
690        Projection::Expression(filter, _) => Projection::Expression(filter, Some(alias)),
691        Projection::Function(name, args) => {
692            if name.contains(':') {
693                Projection::Function(name, args)
694            } else {
695                Projection::Function(format!("{name}:{alias}"), args)
696            }
697        }
698        Projection::Column(column) => Projection::Alias(column, alias),
699        Projection::Window {
700            name, args, window, ..
701        } => Projection::Window {
702            name,
703            args,
704            window,
705            alias: Some(alias),
706        },
707        other => other,
708    }
709}
710
711fn queue_projection_columns(columns: &[Projection]) -> Result<Vec<String>, ParseError> {
712    let mut out = Vec::new();
713    for column in columns {
714        match column {
715            Projection::Column(name) => out.push(name.clone()),
716            Projection::Alias(name, _) => out.push(name.clone()),
717            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
718                out.push(column.clone());
719            }
720            Projection::All => return Ok(Vec::new()),
721            other => {
722                return Err(ParseError::new(
723                    format!(
724                        "unsupported SELECT FROM QUEUE projection {other:?}; use `SELECT *` or bare column names, or use queue verbs (PUSH, POP, PEEK, LEN, ACK, NACK, …) for queue operations"
725                    ),
726                    crate::lexer::Position::default(),
727                ));
728            }
729        }
730    }
731    Ok(out)
732}
733
734impl<'a> Parser<'a> {
735    /// Parse table query clauses (AS OF, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET)
736    pub fn parse_table_clauses(&mut self, query: &mut TableQuery) -> Result<(), ParseError> {
737        // AS OF clause — time-travel anchor. Must come before WHERE
738        // so the executor can bind the snapshot before filter eval.
739        if self.check(&Token::As) {
740            let next_is_of = matches!(self.peek_next()?, Token::Of);
741            if next_is_of {
742                self.expect(Token::As)?;
743                self.expect(Token::Of)?;
744                query.as_of = Some(self.parse_as_of_spec()?);
745            }
746        }
747
748        // WHERE clause
749        if self.consume(&Token::Where)? {
750            let filter = self.parse_filter()?;
751            query.where_expr = Some(filter_to_expr(&filter));
752            query.filter = Some(filter);
753        }
754
755        // GROUP BY clause
756        if self.consume(&Token::Group)? {
757            self.expect(Token::By)?;
758            let (group_by_exprs, group_by) = self.parse_group_by_items()?;
759            query.group_by_exprs = group_by_exprs;
760            query.group_by = group_by;
761        }
762
763        // HAVING clause (only valid after GROUP BY)
764        if !query.group_by_exprs.is_empty() && self.consume_ident_ci("HAVING")? {
765            let having = self.parse_filter()?;
766            query.having_expr = Some(filter_to_expr(&having));
767            query.having = Some(having);
768        }
769
770        // ORDER BY clause
771        if self.consume(&Token::Order)? {
772            self.expect(Token::By)?;
773            query.order_by = self.parse_order_by_list()?;
774        }
775
776        // LIMIT clause
777        if self.consume(&Token::Limit)? {
778            if matches!(self.peek(), Token::Dollar | Token::Question) {
779                query.limit_param = Some(self.parse_param_slot("LIMIT")?);
780                query.limit = None;
781            } else {
782                query.limit = Some(self.parse_integer()? as u64);
783            }
784        }
785
786        // OFFSET clause
787        if self.consume(&Token::Offset)? {
788            if matches!(self.peek(), Token::Dollar | Token::Question) {
789                query.offset_param = Some(self.parse_param_slot("OFFSET")?);
790                query.offset = None;
791            } else {
792                query.offset = Some(self.parse_integer()? as u64);
793            }
794        }
795
796        // WITH EXPAND clause
797        if self.consume(&Token::With)? && self.consume_ident_ci("EXPAND")? {
798            query.expand = Some(self.parse_expand_options()?);
799        }
800
801        Ok(())
802    }
803
804    /// Parse an AS OF spec after `AS OF` has already been consumed.
805    /// Grammar:
806    ///   AS OF COMMIT   '<hex>'
807    ///   AS OF BRANCH   '<name>'
808    ///   AS OF TAG      '<name>'
809    ///   AS OF TIMESTAMP <integer-ms>
810    ///   AS OF SNAPSHOT  <xid>
811    fn parse_as_of_spec(&mut self) -> Result<crate::ast::AsOfClause, ParseError> {
812        use crate::ast::AsOfClause;
813
814        // Keyword — accept both tokenized forms (e.g. Token::Commit
815        // if present) and bare identifiers for flexibility.
816        let keyword = match self.peek() {
817            Token::Ident(s) => {
818                let s = s.to_ascii_uppercase();
819                self.advance()?;
820                s
821            }
822            Token::Commit => {
823                self.advance()?;
824                "COMMIT".to_string()
825            }
826            other => {
827                return Err(ParseError::expected(
828                    vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
829                    other,
830                    self.position(),
831                ));
832            }
833        };
834
835        match keyword.as_str() {
836            "COMMIT" => {
837                let value = self.parse_string()?;
838                Ok(AsOfClause::Commit(value))
839            }
840            "BRANCH" => {
841                let value = self.parse_string()?;
842                Ok(AsOfClause::Branch(value))
843            }
844            "TAG" => {
845                let value = self.parse_string()?;
846                Ok(AsOfClause::Tag(value))
847            }
848            "TIMESTAMP" => {
849                let value = self.parse_integer()?;
850                Ok(AsOfClause::TimestampMs(value))
851            }
852            "SNAPSHOT" => {
853                let value = self.parse_integer()?;
854                if value < 0 {
855                    return Err(ParseError::new(
856                        "AS OF SNAPSHOT requires non-negative xid".to_string(),
857                        self.position(),
858                    ));
859                }
860                Ok(AsOfClause::Snapshot(value as u64))
861            }
862            other => Err(ParseError::expected(
863                vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
864                &Token::Ident(other.into()),
865                self.position(),
866            )),
867        }
868    }
869
870    /// Parse EXPAND options: GRAPH [DEPTH n], CROSS_REFS, ALL
871    fn parse_expand_options(&mut self) -> Result<crate::ast::ExpandOptions, ParseError> {
872        use crate::ast::ExpandOptions;
873        let mut opts = ExpandOptions::default();
874
875        loop {
876            if self.consume(&Token::Graph)? || self.consume_ident_ci("GRAPH")? {
877                opts.graph = true;
878                opts.graph_depth = if self.consume(&Token::Depth)? {
879                    self.parse_integer()? as usize
880                } else {
881                    1
882                };
883            } else if self.consume_ident_ci("CROSS_REFS")?
884                || self.consume_ident_ci("CROSSREFS")?
885                || self.consume_ident_ci("REFS")?
886            {
887                opts.cross_refs = true;
888            } else if self.consume(&Token::All)? || self.consume_ident_ci("ALL")? {
889                opts.graph = true;
890                opts.cross_refs = true;
891                opts.graph_depth = 1;
892            } else {
893                break;
894            }
895            if !self.consume(&Token::Comma)? {
896                break;
897            }
898        }
899
900        if !opts.graph && !opts.cross_refs {
901            opts.graph = true;
902            opts.cross_refs = true;
903            opts.graph_depth = 1;
904        }
905
906        Ok(opts)
907    }
908
909    /// Parse GROUP BY field list
910    pub fn parse_group_by_list(&mut self) -> Result<Vec<String>, ParseError> {
911        Ok(self.parse_group_by_items()?.1)
912    }
913
914    fn parse_group_by_items(&mut self) -> Result<(Vec<Expr>, Vec<String>), ParseError> {
915        let mut exprs = Vec::new();
916        let mut fields = Vec::new();
917        loop {
918            let expr = self.parse_expr()?;
919            let rendered = render_group_by_expr(&expr).ok_or_else(|| {
920                ParseError::new(
921                    "GROUP BY expression cannot yet be lowered to legacy runtime representation"
922                        .to_string(),
923                    self.position(),
924                )
925            })?;
926            exprs.push(expr);
927            fields.push(rendered);
928            if !self.consume(&Token::Comma)? {
929                break;
930            }
931        }
932        Ok((exprs, fields))
933    }
934
935    /// Parse ORDER BY list.
936    ///
937    /// Fase 1.6 unlock: uses the new `Expr` Pratt parser so
938    /// `ORDER BY CAST(age AS INT)`, `ORDER BY a + b * 2`,
939    /// `ORDER BY last_seen - created_at` all parse cleanly. If the
940    /// parsed expression is a bare `Column`, we store it in the
941    /// legacy `field` slot and leave `expr` None so downstream
942    /// consumers (planner cost, mode translators) keep using the
943    /// fast path. Otherwise we stash the full tree in `expr` and
944    /// populate `field` with a synthetic marker that runtime code
945    /// never touches.
946    pub fn parse_order_by_list(&mut self) -> Result<Vec<OrderByClause>, ParseError> {
947        use crate::ast::Expr as AstExpr;
948        let mut clauses = Vec::new();
949        loop {
950            let parsed = self.parse_expr()?;
951            let (field, expr_slot) = match parsed {
952                AstExpr::Column { field, .. } => (field, None),
953                other => (
954                    // Synthetic placeholder so legacy pattern-matches
955                    // on `OrderByClause.field` still destructure.
956                    // Runtime comparators check `expr` first when set,
957                    // so the sentinel never gets resolved against a
958                    // real record.
959                    FieldRef::TableColumn {
960                        table: String::new(),
961                        column: String::new(),
962                    },
963                    Some(other),
964                ),
965            };
966
967            let ascending = if self.consume(&Token::Desc)? {
968                false
969            } else {
970                self.consume(&Token::Asc)?;
971                true
972            };
973
974            let nulls_first = if self.consume(&Token::Nulls)? {
975                if self.consume(&Token::First)? {
976                    true
977                } else {
978                    self.expect(Token::Last)?;
979                    false
980                }
981            } else {
982                !ascending // Default: nulls last for ASC, first for DESC
983            };
984
985            clauses.push(OrderByClause {
986                field,
987                expr: expr_slot,
988                ascending,
989                nulls_first,
990            });
991
992            if !self.consume(&Token::Comma)? {
993                break;
994            }
995        }
996        Ok(clauses)
997    }
998
999    fn parse_function_literal_arg(&mut self) -> Result<String, ParseError> {
1000        let negative = self.consume(&Token::Dash)?;
1001        let mut literal = match self.advance()? {
1002            Token::Integer(n) => {
1003                if negative {
1004                    format!("-{n}")
1005                } else {
1006                    n.to_string()
1007                }
1008            }
1009            Token::Float(n) => {
1010                let value = if negative { -n } else { n };
1011                if value.fract().abs() < f64::EPSILON {
1012                    format!("{}", value as i64)
1013                } else {
1014                    value.to_string()
1015                }
1016            }
1017            other => {
1018                return Err(ParseError::new(
1019                    // F-05: `other` is a `Token` whose Display arms emit raw
1020                    // user bytes for `Ident` / `String` / `JsonLiteral`.
1021                    // Render via `{:?}` so CR/LF/NUL/quotes are escaped
1022                    // before the message reaches downstream serialization
1023                    // sinks.
1024                    format!("expected number, got {:?}", other),
1025                    self.position(),
1026                ));
1027            }
1028        };
1029
1030        if let Token::Ident(unit) = self.peek().clone() {
1031            if is_duration_unit(&unit) {
1032                self.advance()?;
1033                literal.push_str(&unit.to_ascii_lowercase());
1034            }
1035        }
1036
1037        Ok(literal)
1038    }
1039}
1040
1041fn is_duration_unit(unit: &str) -> bool {
1042    matches!(
1043        unit.to_ascii_lowercase().as_str(),
1044        "ms" | "msec"
1045            | "millisecond"
1046            | "milliseconds"
1047            | "s"
1048            | "sec"
1049            | "secs"
1050            | "second"
1051            | "seconds"
1052            | "m"
1053            | "min"
1054            | "mins"
1055            | "minute"
1056            | "minutes"
1057            | "h"
1058            | "hr"
1059            | "hrs"
1060            | "hour"
1061            | "hours"
1062            | "d"
1063            | "day"
1064            | "days"
1065    )
1066}
1067
1068fn render_group_by_expr(expr: &Expr) -> Option<String> {
1069    match expr {
1070        Expr::Column { field, .. } => match field {
1071            FieldRef::TableColumn { table, column } if table.is_empty() => Some(column.clone()),
1072            FieldRef::TableColumn { table, column } => Some(format!("{table}.{column}")),
1073            other => Some(format!("{other:?}")),
1074        },
1075        Expr::FunctionCall { name, args, .. } if name.eq_ignore_ascii_case("TIME_BUCKET") => {
1076            let rendered = args
1077                .iter()
1078                .map(render_group_by_expr)
1079                .collect::<Option<Vec<_>>>()?;
1080            Some(format!("TIME_BUCKET({})", rendered.join(",")))
1081        }
1082        Expr::Literal { value, .. } => Some(match value {
1083            Value::Null => String::new(),
1084            Value::Text(text) => text.to_string(),
1085            other => other.to_string(),
1086        }),
1087        _ => expr_to_projection(expr).map(|projection| match projection {
1088            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1089                column
1090            }
1091            Projection::Field(FieldRef::TableColumn { table, column }, _) => {
1092                format!("{table}.{column}")
1093            }
1094            Projection::Function(name, args) => {
1095                let rendered = args
1096                    .iter()
1097                    .map(render_group_by_function_arg)
1098                    .collect::<Option<Vec<_>>>()
1099                    .unwrap_or_default();
1100                format!(
1101                    "{}({})",
1102                    name.split(':').next().unwrap_or(&name),
1103                    rendered.join(",")
1104                )
1105            }
1106            Projection::Column(column) | Projection::Alias(column, _) => column,
1107            Projection::All => "*".to_string(),
1108            Projection::Expression(_, _) => "expr".to_string(),
1109            Projection::Field(other, _) => format!("{other:?}"),
1110            Projection::Window { name, .. } => name,
1111        }),
1112    }
1113}
1114
1115fn render_group_by_function_arg(arg: &Projection) -> Option<String> {
1116    match arg {
1117        Projection::Column(col) => Some(
1118            col.strip_prefix("LIT:")
1119                .map(str::to_string)
1120                .unwrap_or_else(|| col.clone()),
1121        ),
1122        Projection::All => Some("*".to_string()),
1123        _ => None,
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::*;
1130    use crate::ast::{AsOfClause, BinOp, CompareOp, ExpandOptions, TableSource};
1131
1132    fn parse_table(sql: &str) -> TableQuery {
1133        let parsed = crate::parser::parse(sql).unwrap().query;
1134        let QueryExpr::Table(table) = parsed else {
1135            panic!("expected table query");
1136        };
1137        table
1138    }
1139
1140    fn col(name: &str) -> Expr {
1141        Expr::Column {
1142            field: FieldRef::TableColumn {
1143                table: String::new(),
1144                column: name.to_string(),
1145            },
1146            span: Span::synthetic(),
1147        }
1148    }
1149
1150    #[test]
1151    fn helper_function_catalogs_cover_all_names() {
1152        for name in [
1153            "GEO_DISTANCE",
1154            "GEO_DISTANCE_VINCENTY",
1155            "GEO_BEARING",
1156            "GEO_MIDPOINT",
1157            "HAVERSINE",
1158            "VINCENTY",
1159            "TIME_BUCKET",
1160            "UPPER",
1161            "LOWER",
1162            "LENGTH",
1163            "CHAR_LENGTH",
1164            "CHARACTER_LENGTH",
1165            "OCTET_LENGTH",
1166            "BIT_LENGTH",
1167            "SUBSTRING",
1168            "SUBSTR",
1169            "POSITION",
1170            "TRIM",
1171            "LTRIM",
1172            "RTRIM",
1173            "BTRIM",
1174            "CONCAT",
1175            "CONCAT_WS",
1176            "REVERSE",
1177            "LEFT",
1178            "RIGHT",
1179            "QUOTE_LITERAL",
1180            "ABS",
1181            "ROUND",
1182            "COALESCE",
1183            "STDDEV",
1184            "VARIANCE",
1185            "MEDIAN",
1186            "PERCENTILE",
1187            "GROUP_CONCAT",
1188            "STRING_AGG",
1189            "FIRST",
1190            "LAST",
1191            "ARRAY_AGG",
1192            "COUNT_DISTINCT",
1193            "MONEY",
1194            "MONEY_ASSET",
1195            "MONEY_MINOR",
1196            "MONEY_SCALE",
1197            "VERIFY_PASSWORD",
1198            "CAST",
1199            "CASE",
1200        ] {
1201            assert!(is_scalar_function(name), "{name}");
1202        }
1203        assert!(!is_scalar_function("NOT_A_FUNCTION"));
1204
1205        for name in [
1206            "COUNT",
1207            "AVG",
1208            "SUM",
1209            "MIN",
1210            "MAX",
1211            "STDDEV",
1212            "VARIANCE",
1213            "MEDIAN",
1214            "PERCENTILE",
1215            "GROUP_CONCAT",
1216            "STRING_AGG",
1217            "FIRST",
1218            "LAST",
1219            "ARRAY_AGG",
1220            "COUNT_DISTINCT",
1221        ] {
1222            assert!(is_aggregate_function(name), "{name}");
1223        }
1224        assert!(!is_aggregate_function("LOWER"));
1225
1226        assert_eq!(aggregate_token_name(&Token::Count), Some("COUNT"));
1227        assert_eq!(aggregate_token_name(&Token::Sum), Some("SUM"));
1228        assert_eq!(aggregate_token_name(&Token::Avg), Some("AVG"));
1229        assert_eq!(aggregate_token_name(&Token::Min), Some("MIN"));
1230        assert_eq!(aggregate_token_name(&Token::Max), Some("MAX"));
1231        assert_eq!(aggregate_token_name(&Token::First), Some("FIRST"));
1232        assert_eq!(aggregate_token_name(&Token::Last), Some("LAST"));
1233        assert_eq!(aggregate_token_name(&Token::Ident("COUNT".into())), None);
1234
1235        assert_eq!(scalar_token_name(&Token::Left), Some("LEFT"));
1236        assert_eq!(scalar_token_name(&Token::Right), Some("RIGHT"));
1237        assert_eq!(scalar_token_name(&Token::Ident("LEFT".into())), None);
1238
1239        for unit in [
1240            "ms",
1241            "msec",
1242            "millisecond",
1243            "milliseconds",
1244            "s",
1245            "sec",
1246            "secs",
1247            "second",
1248            "seconds",
1249            "m",
1250            "min",
1251            "mins",
1252            "minute",
1253            "minutes",
1254            "h",
1255            "hr",
1256            "hrs",
1257            "hour",
1258            "hours",
1259            "d",
1260            "day",
1261            "days",
1262        ] {
1263            assert!(is_duration_unit(unit), "{unit}");
1264        }
1265        assert!(!is_duration_unit("fortnight"));
1266    }
1267
1268    #[test]
1269    fn projection_and_group_render_helpers_cover_aliases_and_exprs() {
1270        let field = FieldRef::TableColumn {
1271            table: String::new(),
1272            column: "name".into(),
1273        };
1274        let filter = Filter::Compare {
1275            field: field.clone(),
1276            op: CompareOp::Eq,
1277            value: Value::text("alice"),
1278        };
1279
1280        assert_eq!(
1281            attach_projection_alias(Projection::Field(field.clone(), None), Some("n".into())),
1282            Projection::Field(field.clone(), Some("n".into()))
1283        );
1284        assert_eq!(
1285            attach_projection_alias(
1286                Projection::Expression(Box::new(filter.clone()), None),
1287                Some("ok".into())
1288            ),
1289            Projection::Expression(Box::new(filter), Some("ok".into()))
1290        );
1291        assert_eq!(
1292            attach_projection_alias(
1293                Projection::Function("LOWER".into(), vec![]),
1294                Some("l".into())
1295            ),
1296            Projection::Function("LOWER:l".into(), vec![])
1297        );
1298        assert_eq!(
1299            attach_projection_alias(
1300                Projection::Function("LOWER:l".into(), vec![]),
1301                Some("ignored".into())
1302            ),
1303            Projection::Function("LOWER:l".into(), vec![])
1304        );
1305        assert_eq!(
1306            attach_projection_alias(Projection::Column("name".into()), Some("n".into())),
1307            Projection::Alias("name".into(), "n".into())
1308        );
1309        assert_eq!(
1310            attach_projection_alias(Projection::All, Some("ignored".into())),
1311            Projection::All
1312        );
1313
1314        assert_eq!(render_group_by_expr(&col("dept")).as_deref(), Some("dept"));
1315        assert_eq!(
1316            render_group_by_expr(&Expr::Column {
1317                field: FieldRef::TableColumn {
1318                    table: "employees".into(),
1319                    column: "dept".into()
1320                },
1321                span: Span::synthetic()
1322            })
1323            .as_deref(),
1324            Some("employees.dept")
1325        );
1326        assert_eq!(
1327            render_group_by_expr(&Expr::Column {
1328                field: FieldRef::NodeId { alias: "n".into() },
1329                span: Span::synthetic()
1330            }),
1331            Some("NodeId { alias: \"n\" }".into())
1332        );
1333        assert_eq!(
1334            render_group_by_expr(&Expr::Literal {
1335                value: Value::Null,
1336                span: Span::synthetic()
1337            })
1338            .as_deref(),
1339            Some("")
1340        );
1341        assert_eq!(
1342            render_group_by_expr(&Expr::Literal {
1343                value: Value::text("5m"),
1344                span: Span::synthetic()
1345            })
1346            .as_deref(),
1347            Some("5m")
1348        );
1349        assert_eq!(
1350            render_group_by_expr(&Expr::Literal {
1351                value: Value::Integer(7),
1352                span: Span::synthetic()
1353            })
1354            .as_deref(),
1355            Some("7")
1356        );
1357        assert_eq!(
1358            render_group_by_expr(&Expr::FunctionCall {
1359                name: "TIME_BUCKET".into(),
1360                args: vec![
1361                    col("ts"),
1362                    Expr::Literal {
1363                        value: Value::text("5m"),
1364                        span: Span::synthetic()
1365                    }
1366                ],
1367                span: Span::synthetic()
1368            })
1369            .as_deref(),
1370            Some("TIME_BUCKET(ts,5m)")
1371        );
1372        assert_eq!(
1373            render_group_by_expr(&Expr::FunctionCall {
1374                name: "LOWER".into(),
1375                args: vec![col("dept")],
1376                span: Span::synthetic()
1377            })
1378            .as_deref(),
1379            Some("LOWER()")
1380        );
1381
1382        assert_eq!(
1383            render_group_by_function_arg(&Projection::Column("LIT:5m".into())),
1384            Some("5m".into())
1385        );
1386        assert_eq!(
1387            render_group_by_function_arg(&Projection::Column("dept".into())),
1388            Some("dept".into())
1389        );
1390        assert_eq!(
1391            render_group_by_function_arg(&Projection::All),
1392            Some("*".into())
1393        );
1394        assert_eq!(
1395            render_group_by_function_arg(&Projection::Function("LOWER".into(), vec![])),
1396            None
1397        );
1398    }
1399
1400    #[test]
1401    fn expression_aggregate_detection_branches() {
1402        let count = Expr::FunctionCall {
1403            name: "COUNT".into(),
1404            args: vec![col("id")],
1405            span: Span::synthetic(),
1406        };
1407        assert!(contains_nested_aggregate(&count));
1408        assert!(is_plain_aggregate_expr(&count));
1409
1410        let nested = Expr::FunctionCall {
1411            name: "SUM".into(),
1412            args: vec![count.clone()],
1413            span: Span::synthetic(),
1414        };
1415        assert!(contains_nested_aggregate(&nested));
1416        assert!(!is_plain_aggregate_expr(&nested));
1417
1418        let binary = Expr::BinaryOp {
1419            op: BinOp::Add,
1420            lhs: Box::new(col("a")),
1421            rhs: Box::new(count.clone()),
1422            span: Span::synthetic(),
1423        };
1424        assert!(contains_nested_aggregate(&binary));
1425
1426        let unary = Expr::UnaryOp {
1427            op: UnaryOp::Not,
1428            operand: Box::new(count.clone()),
1429            span: Span::synthetic(),
1430        };
1431        assert!(contains_nested_aggregate(&unary));
1432
1433        let cast = Expr::Cast {
1434            inner: Box::new(count.clone()),
1435            target: reddb_types::types::DataType::Integer,
1436            span: Span::synthetic(),
1437        };
1438        assert!(contains_nested_aggregate(&cast));
1439
1440        let case = Expr::Case {
1441            branches: vec![(col("flag"), count.clone())],
1442            else_: Some(Box::new(col("fallback"))),
1443            span: Span::synthetic(),
1444        };
1445        assert!(contains_nested_aggregate(&case));
1446
1447        let in_list = Expr::InList {
1448            target: Box::new(col("id")),
1449            values: vec![count.clone()],
1450            negated: false,
1451            span: Span::synthetic(),
1452        };
1453        assert!(contains_nested_aggregate(&in_list));
1454
1455        let between = Expr::Between {
1456            target: Box::new(col("id")),
1457            low: Box::new(col("low")),
1458            high: Box::new(count),
1459            negated: false,
1460            span: Span::synthetic(),
1461        };
1462        assert!(contains_nested_aggregate(&between));
1463        assert!(!contains_nested_aggregate(&Expr::Parameter {
1464            index: 1,
1465            span: Span::synthetic()
1466        }));
1467
1468        assert!(crate::parser::parse("SELECT SUM(COUNT(id)) FROM t").is_err());
1469    }
1470
1471    #[test]
1472    fn table_clause_parsing_covers_as_of_order_offset_and_expand() {
1473        let table = parse_table(
1474            "SELECT name FROM users AS OF COMMIT 'abc123' \
1475             WHERE deleted_at IS NULL \
1476             ORDER BY LOWER(name) ASC NULLS FIRST, created_at DESC NULLS LAST \
1477             LIMIT 10 OFFSET 5 WITH EXPAND GRAPH DEPTH 3, CROSS_REFS",
1478        );
1479        assert!(matches!(table.as_of, Some(AsOfClause::Commit(ref v)) if v == "abc123"));
1480        assert!(table.filter.is_some());
1481        assert_eq!(table.order_by.len(), 2);
1482        assert!(table.order_by[0].expr.is_some());
1483        assert!(table.order_by[0].ascending);
1484        assert!(table.order_by[0].nulls_first);
1485        assert!(!table.order_by[1].ascending);
1486        assert!(!table.order_by[1].nulls_first);
1487        assert_eq!(table.limit, Some(10));
1488        assert_eq!(table.offset, Some(5));
1489        assert!(matches!(
1490            table.expand,
1491            Some(ExpandOptions {
1492                graph: true,
1493                graph_depth: 3,
1494                cross_refs: true,
1495                ..
1496            })
1497        ));
1498
1499        let table = parse_table("SELECT * FROM users AS OF BRANCH 'main'");
1500        assert!(matches!(table.as_of, Some(AsOfClause::Branch(ref v)) if v == "main"));
1501
1502        let table = parse_table("SELECT * FROM users AS OF TAG 'v1'");
1503        assert!(matches!(table.as_of, Some(AsOfClause::Tag(ref v)) if v == "v1"));
1504
1505        let table = parse_table("SELECT * FROM users AS OF TIMESTAMP 1710000000000");
1506        assert!(matches!(
1507            table.as_of,
1508            Some(AsOfClause::TimestampMs(1_710_000_000_000))
1509        ));
1510
1511        let table = parse_table("SELECT * FROM users AS OF SNAPSHOT 42");
1512        assert!(matches!(table.as_of, Some(AsOfClause::Snapshot(42))));
1513
1514        let table = parse_table("SELECT * FROM users WITH EXPAND");
1515        assert!(matches!(
1516            table.expand,
1517            Some(ExpandOptions {
1518                graph: true,
1519                graph_depth: 1,
1520                cross_refs: true,
1521                ..
1522            })
1523        ));
1524
1525        assert!(crate::parser::parse("SELECT * FROM users AS OF SNAPSHOT -1").is_err());
1526        assert!(crate::parser::parse("SELECT * FROM users AS OF UNKNOWN 'x'").is_err());
1527    }
1528
1529    #[test]
1530    fn direct_parser_helpers_cover_projection_group_order_and_literals() {
1531        let mut parser = Parser::new("name, LOWER(email) AS email_l").unwrap();
1532        let projections = parser.parse_projection_list().unwrap();
1533        assert_eq!(projections.len(), 2);
1534
1535        let mut parser = Parser::new("dept, TIME_BUCKET(5 m)").unwrap();
1536        let group_by = parser.parse_group_by_list().unwrap();
1537        assert_eq!(group_by, vec!["dept", "TIME_BUCKET(5m)"]);
1538
1539        let mut parser = Parser::new("LOWER(name) DESC, created_at").unwrap();
1540        let order_by = parser.parse_order_by_list().unwrap();
1541        assert_eq!(order_by.len(), 2);
1542        assert!(order_by[0].expr.is_some());
1543        assert!(!order_by[0].ascending);
1544        assert!(order_by[0].nulls_first);
1545        assert!(order_by[1].ascending);
1546        assert!(!order_by[1].nulls_first);
1547
1548        let mut parser = Parser::new("-5 ms").unwrap();
1549        assert_eq!(parser.parse_function_literal_arg().unwrap(), "-5ms");
1550        let mut parser = Parser::new("2.0 H").unwrap();
1551        assert_eq!(parser.parse_function_literal_arg().unwrap(), "2h");
1552        let mut parser = Parser::new("bad").unwrap();
1553        assert!(parser.parse_function_literal_arg().is_err());
1554    }
1555
1556    #[test]
1557    fn from_subquery_source_is_preserved() {
1558        let parsed = crate::parser::parse("FROM (SELECT id FROM users) AS u RETURN u.id")
1559            .unwrap()
1560            .query;
1561        let QueryExpr::Table(table) = parsed else {
1562            panic!("expected table query");
1563        };
1564        assert_eq!(table.table, "__subq_u");
1565        assert_eq!(table.alias.as_deref(), Some("u"));
1566        assert!(matches!(table.source, Some(TableSource::Subquery(_))));
1567        assert_eq!(table.select_items.len(), 1);
1568
1569        let parsed = crate::parser::parse("SELECT id FROM (SELECT id FROM users) AS u")
1570            .unwrap()
1571            .query;
1572        let QueryExpr::Table(table) = parsed else {
1573            panic!("expected table query");
1574        };
1575        assert_eq!(table.table, "__subq_u");
1576        assert_eq!(table.alias.as_deref(), Some("u"));
1577        assert!(matches!(table.source, Some(TableSource::Subquery(_))));
1578        assert_eq!(table.select_items.len(), 1);
1579
1580        assert!(crate::parser::parse("FROM (MATCH (n) RETURN n) AS g").is_err());
1581        assert!(crate::parser::parse("SELECT * FROM (MATCH (n) RETURN n) AS g").is_err());
1582    }
1583
1584    // ── Table-valued function arguments (issues #795 / #796) ──
1585
1586    #[test]
1587    fn louvain_tvf_parses_positional_and_named_args() {
1588        // Bare positional form: louvain(<graph>).
1589        let table = parse_table("SELECT * FROM louvain(g)");
1590        match table.source {
1591            Some(TableSource::Function {
1592                ref name,
1593                ref args,
1594                ref named_args,
1595            }) => {
1596                assert_eq!(name, "louvain");
1597                assert_eq!(args, &vec!["g".to_string()]);
1598                assert!(named_args.is_empty());
1599            }
1600            other => panic!("expected louvain TVF source, got {other:?}"),
1601        }
1602
1603        // Named-argument form: louvain(<graph>, resolution => <f64>).
1604        let table = parse_table("SELECT * FROM louvain(g, resolution => 0.5)");
1605        match table.source {
1606            Some(TableSource::Function {
1607                ref name,
1608                ref args,
1609                ref named_args,
1610            }) => {
1611                assert_eq!(name, "louvain");
1612                assert_eq!(args, &vec!["g".to_string()]);
1613                assert_eq!(named_args.len(), 1);
1614                assert_eq!(named_args[0].0, "resolution");
1615                assert!((named_args[0].1 - 0.5).abs() < f64::EPSILON);
1616            }
1617            other => panic!("expected louvain TVF source, got {other:?}"),
1618        }
1619
1620        // Integer resolution is accepted and coerced to f64.
1621        let table = parse_table("SELECT * FROM louvain(g, resolution => 2)");
1622        match table.source {
1623            Some(TableSource::Function { ref named_args, .. }) => {
1624                assert!((named_args[0].1 - 2.0).abs() < f64::EPSILON);
1625            }
1626            other => panic!("expected louvain TVF source, got {other:?}"),
1627        }
1628    }
1629
1630    // ── Inline graph TVF: `nodes => / edges =>` subqueries (issue #799) ──
1631
1632    #[test]
1633    fn tvf_inline_form_parses_nodes_and_edges_subqueries() {
1634        // The inline form must produce a structurally distinct AST node
1635        // (InlineGraphFunction) from the graph-collection Function form.
1636        let table = parse_table(
1637            "SELECT * FROM components(nodes => (SELECT id FROM hosts), edges => (SELECT src, dst FROM links))",
1638        );
1639        match table.source {
1640            Some(TableSource::InlineGraphFunction {
1641                ref name,
1642                ref nodes,
1643                ref edges,
1644                ref named_args,
1645            }) => {
1646                assert_eq!(name, "components");
1647                assert!(named_args.is_empty());
1648                assert!(matches!(**nodes, QueryExpr::Table(_)));
1649                assert!(matches!(**edges, QueryExpr::Table(_)));
1650            }
1651            other => panic!("expected inline graph TVF source, got {other:?}"),
1652        }
1653    }
1654
1655    #[test]
1656    fn tvf_inline_form_carries_numeric_named_args() {
1657        // `resolution => <f64>` coexists with the inline subqueries.
1658        let table = parse_table(
1659            "SELECT * FROM louvain(nodes => (SELECT id FROM n), edges => (SELECT a, b FROM e), resolution => 0.5)",
1660        );
1661        match table.source {
1662            Some(TableSource::InlineGraphFunction {
1663                ref name,
1664                ref named_args,
1665                ..
1666            }) => {
1667                assert_eq!(name, "louvain");
1668                assert_eq!(named_args.len(), 1);
1669                assert_eq!(named_args[0].0, "resolution");
1670                assert!((named_args[0].1 - 0.5).abs() < f64::EPSILON);
1671            }
1672            other => panic!("expected inline graph TVF source, got {other:?}"),
1673        }
1674    }
1675
1676    #[test]
1677    fn tvf_inline_form_rejects_malformed_shapes() {
1678        // A positional graph argument cannot mix with inline subqueries.
1679        assert!(crate::parser::parse(
1680            "SELECT * FROM components(g, nodes => (SELECT id FROM n), edges => (SELECT a, b FROM e))"
1681        )
1682        .is_err());
1683        // The inline form requires both `nodes` and `edges`.
1684        assert!(
1685            crate::parser::parse("SELECT * FROM components(nodes => (SELECT id FROM n))").is_err()
1686        );
1687        assert!(
1688            crate::parser::parse("SELECT * FROM components(edges => (SELECT a, b FROM e))")
1689                .is_err()
1690        );
1691        // An unknown subquery key is rejected.
1692        assert!(crate::parser::parse(
1693            "SELECT * FROM components(nodes => (SELECT id FROM n), verts => (SELECT a, b FROM e))"
1694        )
1695        .is_err());
1696        // A `=>` followed by a non-SELECT parenthesised group is rejected.
1697        assert!(crate::parser::parse(
1698            "SELECT * FROM components(nodes => (1 + 2), edges => (SELECT a, b FROM e))"
1699        )
1700        .is_err());
1701    }
1702
1703    #[test]
1704    fn shortest_path_tvf_parses_graph_ref_with_scalar_named_args() {
1705        // Required src/dst only.
1706        let table = parse_table("SELECT * FROM shortest_path(g, src => 1, dst => 4)");
1707        match table.source {
1708            Some(TableSource::Function {
1709                ref name,
1710                ref args,
1711                ref named_args,
1712            }) => {
1713                assert_eq!(name, "shortest_path");
1714                assert_eq!(args, &vec!["g".to_string()]);
1715                assert_eq!(named_args.len(), 2);
1716                assert_eq!(named_args[0].0, "src");
1717                assert!((named_args[0].1 - 1.0).abs() < f64::EPSILON);
1718                assert_eq!(named_args[1].0, "dst");
1719                assert!((named_args[1].1 - 4.0).abs() < f64::EPSILON);
1720            }
1721            other => panic!("expected shortest_path TVF source, got {other:?}"),
1722        }
1723
1724        // Optional max_hops named argument is accepted alongside src/dst.
1725        let table =
1726            parse_table("SELECT * FROM shortest_path(g, src => 1, dst => 4, max_hops => 3)");
1727        match table.source {
1728            Some(TableSource::Function { ref named_args, .. }) => {
1729                assert_eq!(named_args.len(), 3);
1730                assert_eq!(named_args[2].0, "max_hops");
1731                assert!((named_args[2].1 - 3.0).abs() < f64::EPSILON);
1732            }
1733            other => panic!("expected shortest_path TVF source, got {other:?}"),
1734        }
1735    }
1736
1737    #[test]
1738    fn centrality_tvfs_parse_positional_and_named_args() {
1739        // Bare positional form for each centrality TVF (issue #797). These flow
1740        // through the generic `ident(args)` path (not a dedicated keyword), so
1741        // the parser records them as a `TableSource::Function`.
1742        for name in ["betweenness", "eigenvector", "pagerank"] {
1743            let table = parse_table(&format!("SELECT * FROM {name}(g)"));
1744            match table.source {
1745                Some(TableSource::Function {
1746                    name: ref got,
1747                    ref args,
1748                    ref named_args,
1749                }) => {
1750                    assert_eq!(got, name);
1751                    assert_eq!(args, &vec!["g".to_string()]);
1752                    assert!(named_args.is_empty());
1753                }
1754                other => panic!("expected {name} TVF source, got {other:?}"),
1755            }
1756        }
1757
1758        // eigenvector(<graph>, max_iterations => <i64>, tolerance => <f64>).
1759        let table =
1760            parse_table("SELECT * FROM eigenvector(g, max_iterations => 50, tolerance => 0.0001)");
1761        match table.source {
1762            Some(TableSource::Function { ref named_args, .. }) => {
1763                assert_eq!(named_args.len(), 2);
1764                assert_eq!(named_args[0].0, "max_iterations");
1765                assert!((named_args[0].1 - 50.0).abs() < f64::EPSILON);
1766                assert_eq!(named_args[1].0, "tolerance");
1767                assert!((named_args[1].1 - 0.0001).abs() < f64::EPSILON);
1768            }
1769            other => panic!("expected eigenvector TVF source, got {other:?}"),
1770        }
1771
1772        // pagerank(<graph>, damping => <f64>, max_iterations => <i64>).
1773        let table =
1774            parse_table("SELECT * FROM pagerank(g, damping => 0.85, max_iterations => 100)");
1775        match table.source {
1776            Some(TableSource::Function {
1777                ref args,
1778                ref named_args,
1779                ..
1780            }) => {
1781                assert_eq!(args, &vec!["g".to_string()]);
1782                assert_eq!(named_args.len(), 2);
1783                assert_eq!(named_args[0].0, "damping");
1784                assert!((named_args[0].1 - 0.85).abs() < f64::EPSILON);
1785                assert_eq!(named_args[1].0, "max_iterations");
1786                assert!((named_args[1].1 - 100.0).abs() < f64::EPSILON);
1787            }
1788            other => panic!("expected pagerank TVF source, got {other:?}"),
1789        }
1790    }
1791
1792    #[test]
1793    fn tvf_named_arg_grammar_rejects_malformed_forms() {
1794        // A positional argument after a named argument is rejected.
1795        assert!(crate::parser::parse("SELECT * FROM louvain(g, resolution => 0.5, h)").is_err());
1796        // `=>` must be followed by a number.
1797        assert!(crate::parser::parse("SELECT * FROM louvain(g, resolution => foo)").is_err());
1798        // Zero-argument form is still rejected (issue #795 invariant).
1799        assert!(crate::parser::parse("SELECT * FROM louvain()").is_err());
1800    }
1801
1802    // ── SESSIONIZE operator (issue #585 slice 8) ──
1803
1804    #[test]
1805    fn test_parse_sessionize_full_clause() {
1806        let q = parse_table(
1807            "SELECT user_id, ts FROM events SESSIONIZE BY user_id GAP 30 m ORDER BY ts",
1808        );
1809        let s = q.sessionize.expect("sessionize present");
1810        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1811        assert_eq!(s.gap_ms, Some(30 * 60_000));
1812        assert_eq!(s.order_col.as_deref(), Some("ts"));
1813    }
1814
1815    #[test]
1816    fn test_parse_sessionize_omits_optional_order_by() {
1817        let q = parse_table("SELECT * FROM events SESSIONIZE BY user_id GAP 5 s");
1818        let s = q.sessionize.expect("sessionize present");
1819        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1820        assert_eq!(s.gap_ms, Some(5_000));
1821        assert!(s.order_col.is_none());
1822    }
1823
1824    #[test]
1825    fn test_parse_sessionize_bare_defers_to_descriptor() {
1826        // Both BY and GAP omitted — parser accepts the shape; the
1827        // executor raises MissingSessionKey when the descriptor
1828        // doesn't supply defaults.
1829        let q = parse_table("SELECT * FROM events SESSIONIZE");
1830        let s = q.sessionize.expect("sessionize present");
1831        assert!(s.actor_col.is_none());
1832        assert!(s.gap_ms.is_none());
1833        assert!(s.order_col.is_none());
1834    }
1835
1836    #[test]
1837    fn test_parse_sessionize_composes_with_where_and_limit() {
1838        let q = parse_table(
1839            "SELECT user_id FROM events \
1840             SESSIONIZE BY user_id GAP 1 m \
1841             WHERE user_id = 'u1' LIMIT 10",
1842        );
1843        let s = q.sessionize.expect("sessionize present");
1844        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1845        assert_eq!(s.gap_ms, Some(60_000));
1846        assert!(q.where_expr.is_some(), "WHERE still parsed");
1847        assert_eq!(q.limit, Some(10));
1848    }
1849
1850    #[test]
1851    fn test_parse_sessionize_absent_leaves_field_none() {
1852        let q = parse_table("SELECT * FROM events");
1853        assert!(q.sessionize.is_none());
1854    }
1855
1856    #[test]
1857    fn test_parse_sessionize_with_session_id_in_projection_e2e_shape() {
1858        // Matches the literal shape e2e tests use — session_id in the
1859        // projection list must not confuse the parser.
1860        let q = parse_table(
1861            "SELECT id, user_id, ts, session_id FROM events \
1862             SESSIONIZE BY user_id GAP 30 s ORDER BY ts",
1863        );
1864        let s = q.sessionize.expect("sessionize present");
1865        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1866        assert_eq!(s.gap_ms, Some(30_000));
1867    }
1868}