Skip to main content

reddb_server/storage/query/parser/
table.rs

1//! Table query parsing (SELECT ... FROM ...)
2
3use super::super::ast::{
4    BinOp, CompareOp, Expr, FieldRef, Filter, OrderByClause, Projection, QueryExpr,
5    QueueSelectQuery, SelectItem, Span, TableQuery, UnaryOp,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use crate::storage::query::sql_lowering::{
10    expr_to_projection, filter_to_expr, select_item_to_projection,
11};
12use crate::storage::schema::Value;
13
14fn is_scalar_function(name: &str) -> bool {
15    matches!(
16        name,
17        "GEO_DISTANCE"
18            | "GEO_DISTANCE_VINCENTY"
19            | "GEO_BEARING"
20            | "GEO_MIDPOINT"
21            | "HAVERSINE"
22            | "VINCENTY"
23            | "TIME_BUCKET"
24            | "UPPER"
25            | "LOWER"
26            | "LENGTH"
27            | "CHAR_LENGTH"
28            | "CHARACTER_LENGTH"
29            | "OCTET_LENGTH"
30            | "BIT_LENGTH"
31            | "SUBSTRING"
32            | "SUBSTR"
33            | "POSITION"
34            | "TRIM"
35            | "LTRIM"
36            | "RTRIM"
37            | "BTRIM"
38            | "CONCAT"
39            | "CONCAT_WS"
40            | "REVERSE"
41            | "LEFT"
42            | "RIGHT"
43            | "QUOTE_LITERAL"
44            | "ABS"
45            | "ROUND"
46            | "COALESCE"
47            | "STDDEV"
48            | "VARIANCE"
49            | "MEDIAN"
50            | "PERCENTILE"
51            | "GROUP_CONCAT"
52            | "STRING_AGG"
53            | "FIRST"
54            | "LAST"
55            | "ARRAY_AGG"
56            | "COUNT_DISTINCT"
57            | "MONEY"
58            | "MONEY_ASSET"
59            | "MONEY_MINOR"
60            | "MONEY_SCALE"
61            | "VERIFY_PASSWORD"
62            | "CAST"
63            | "CASE"
64    )
65}
66
67fn is_aggregate_function(name: &str) -> bool {
68    matches!(
69        name,
70        "COUNT"
71            | "AVG"
72            | "SUM"
73            | "MIN"
74            | "MAX"
75            | "STDDEV"
76            | "VARIANCE"
77            | "MEDIAN"
78            | "PERCENTILE"
79            | "GROUP_CONCAT"
80            | "STRING_AGG"
81            | "FIRST"
82            | "LAST"
83            | "ARRAY_AGG"
84            | "COUNT_DISTINCT"
85    )
86}
87
88fn aggregate_token_name(token: &Token) -> Option<&'static str> {
89    match token {
90        Token::Count => Some("COUNT"),
91        Token::Sum => Some("SUM"),
92        Token::Avg => Some("AVG"),
93        Token::Min => Some("MIN"),
94        Token::Max => Some("MAX"),
95        Token::First => Some("FIRST"),
96        Token::Last => Some("LAST"),
97        _ => None,
98    }
99}
100
101fn scalar_token_name(token: &Token) -> Option<&'static str> {
102    match token {
103        Token::Left => Some("LEFT"),
104        Token::Right => Some("RIGHT"),
105        _ => None,
106    }
107}
108use super::Parser;
109
110impl<'a> Parser<'a> {
111    /// Parse SELECT ... FROM ... query
112    pub fn parse_select_query(&mut self) -> Result<QueryExpr, ParseError> {
113        // Recursion guard: nested subqueries (UNION, derived tables,
114        // EXISTS) re-enter through this point, so depth here bounds
115        // the SELECT-shaped recursion in addition to the expr Pratt
116        // climb guarded in `parse_expr_prec`.
117        self.enter_depth()?;
118        let result = self.parse_select_query_inner();
119        self.exit_depth();
120        result
121    }
122
123    /// Parse the comma-separated argument list of a table-valued function
124    /// call. The opening `(` has already been consumed; the caller consumes
125    /// the closing `)`. Requires at least one argument and rejects malformed
126    /// forms (issue #795).
127    ///
128    /// Three argument shapes are accepted (issues #796 / #799):
129    /// - positional identifiers, e.g. the graph collection `g`;
130    /// - named numeric arguments `key => <number>`, e.g. `resolution => 0.5`;
131    /// - named subquery arguments `key => (<SELECT …>)`, e.g.
132    ///   `nodes => (SELECT id FROM hosts)` (the inline-graph form).
133    ///
134    /// Positional arguments must precede named arguments; a positional
135    /// argument after any named one is a clear error. Returns the positional
136    /// identifiers, the named numeric `(key, value)` pairs, and the named
137    /// subquery `(key, query)` pairs, each in source order.
138    #[allow(clippy::type_complexity)]
139    fn parse_table_function_args(
140        &mut self,
141        name: &str,
142    ) -> Result<(Vec<String>, Vec<(String, f64)>, Vec<(String, QueryExpr)>), ParseError> {
143        // Zero-argument form `name()` is rejected with a clear message.
144        if matches!(self.peek(), Token::RParen) {
145            return Err(ParseError::new(
146                format!("table function '{name}' requires at least one argument"),
147                self.position(),
148            ));
149        }
150
151        let mut args = Vec::new();
152        let mut named_args = Vec::new();
153        let mut subquery_args: Vec<(String, QueryExpr)> = Vec::new();
154        loop {
155            // Each argument starts with an identifier (the positional value or
156            // the named-argument key). A handful of named-argument keys lex as
157            // reserved keywords (e.g. `max_iterations`); accept those here so
158            // the centrality TVFs (issue #797) can name them, mapping the token
159            // back to its lowercase identifier spelling.
160            let ident = match self.advance()? {
161                Token::Ident(arg) => arg,
162                Token::MaxIterations => "max_iterations".to_string(),
163                other => {
164                    return Err(ParseError::expected(
165                        vec!["table function argument identifier"],
166                        &other,
167                        self.position(),
168                    ));
169                }
170            };
171
172            // `ident => (SELECT …)` is a named subquery argument;
173            // `ident => <number>` is a named numeric argument; otherwise it is
174            // a bare positional identifier.
175            if matches!(self.peek(), Token::FatArrow) {
176                self.advance()?; // consume '=>'
177                if matches!(self.peek(), Token::LParen) {
178                    self.advance()?; // consume '('
179                    if !self.check(&Token::Select) {
180                        let found = self.peek().clone();
181                        return Err(ParseError::expected(
182                            vec!["SELECT subquery"],
183                            &found,
184                            self.position(),
185                        ));
186                    }
187                    let query = self.parse_select_query()?;
188                    self.expect(Token::RParen)?;
189                    subquery_args.push((ident, query));
190                } else {
191                    let value = self.parse_float()?;
192                    named_args.push((ident, value));
193                }
194            } else if named_args.is_empty() && subquery_args.is_empty() {
195                args.push(ident);
196            } else {
197                return Err(ParseError::new(
198                    format!(
199                        "table function '{name}' positional argument '{ident}' cannot follow a named argument"
200                    ),
201                    self.position(),
202                ));
203            }
204
205            // A comma continues the list; a `)` ends it (consumed by caller).
206            // Anything else is a clear error (e.g. a missing closing paren).
207            match self.peek() {
208                Token::Comma => {
209                    self.advance()?;
210                    continue;
211                }
212                Token::RParen => break,
213                _ => {
214                    let found = self.peek().clone();
215                    return Err(ParseError::expected(
216                        vec!["','", "')'"],
217                        &found,
218                        self.position(),
219                    ));
220                }
221            }
222        }
223        Ok((args, named_args, subquery_args))
224    }
225
226    /// Build the `TableSource` for a table-valued function call from its
227    /// parsed argument lists. When `subquery_args` is non-empty the call uses
228    /// the inline-graph form (`nodes => / edges =>`); otherwise it is the
229    /// graph-collection form (issue #799).
230    fn build_table_function_source(
231        &self,
232        name: String,
233        args: Vec<String>,
234        named_args: Vec<(String, f64)>,
235        subquery_args: Vec<(String, QueryExpr)>,
236    ) -> Result<crate::storage::query::ast::TableSource, ParseError> {
237        use crate::storage::query::ast::TableSource;
238
239        if subquery_args.is_empty() {
240            return Ok(TableSource::Function {
241                name,
242                args,
243                named_args,
244            });
245        }
246
247        // Inline-graph form: exactly one `nodes` and one `edges` subquery, no
248        // positional graph-collection argument.
249        if !args.is_empty() {
250            return Err(ParseError::new(
251                format!(
252                    "table function '{name}' inline form takes no positional graph argument; pass `nodes => (…), edges => (…)`"
253                ),
254                self.position(),
255            ));
256        }
257
258        let mut nodes: Option<QueryExpr> = None;
259        let mut edges: Option<QueryExpr> = None;
260        for (key, query) in subquery_args {
261            if key.eq_ignore_ascii_case("nodes") {
262                if nodes.is_some() {
263                    return Err(ParseError::new(
264                        format!(
265                            "table function '{name}' has a duplicate 'nodes' subquery argument"
266                        ),
267                        self.position(),
268                    ));
269                }
270                nodes = Some(query);
271            } else if key.eq_ignore_ascii_case("edges") {
272                if edges.is_some() {
273                    return Err(ParseError::new(
274                        format!(
275                            "table function '{name}' has a duplicate 'edges' subquery argument"
276                        ),
277                        self.position(),
278                    ));
279                }
280                edges = Some(query);
281            } else {
282                return Err(ParseError::new(
283                    format!(
284                        "table function '{name}' has no subquery argument '{key}' (expected 'nodes' or 'edges')"
285                    ),
286                    self.position(),
287                ));
288            }
289        }
290
291        let (Some(nodes), Some(edges)) = (nodes, edges) else {
292            return Err(ParseError::new(
293                format!(
294                    "table function '{name}' inline form requires both `nodes => (…)` and `edges => (…)` subqueries"
295                ),
296                self.position(),
297            ));
298        };
299
300        Ok(TableSource::InlineGraphFunction {
301            name,
302            nodes: Box::new(nodes),
303            edges: Box::new(edges),
304            named_args,
305        })
306    }
307
308    /// Read one segment of a dotted table name. The graph analytics outputs
309    /// `components` and `centrality` lex as reserved keywords, so they are
310    /// normalised back to their lowercase spelling here; every other segment
311    /// (e.g. `communities`, or virtual-schema suffixes like `collections`) is
312    /// an ordinary identifier. Issue #800.
313    fn parse_table_name_segment(&mut self) -> Result<String, ParseError> {
314        match self.peek() {
315            Token::Components => {
316                self.advance()?;
317                Ok("components".to_string())
318            }
319            Token::Centrality => {
320                self.advance()?;
321                Ok("centrality".to_string())
322            }
323            _ => self.expect_ident(),
324        }
325    }
326
327    fn parse_select_query_inner(&mut self) -> Result<QueryExpr, ParseError> {
328        self.expect(Token::Select)?;
329
330        // Parse column list
331        let (select_items, columns) = self.parse_select_items_and_projections()?;
332
333        // Parse optional table source. If omitted, default to `ANY` so the query
334        // can return mixed entities (table, document, graph, and vector) by default.
335        let has_from = self.consume(&Token::From)?;
336        // Optional structured FROM source. Currently populated only for
337        // table-valued function calls such as `components(g)` (issue #795);
338        // plain tables leave this `None` and rely on the legacy `table` slot.
339        let mut table_source: Option<crate::storage::query::ast::TableSource> = None;
340        let table = if has_from {
341            if self.consume(&Token::Queue)? {
342                let queue = self.expect_ident()?;
343                let filter = if self.consume(&Token::Where)? {
344                    Some(self.parse_filter()?)
345                } else {
346                    None
347                };
348                let limit = if self.consume(&Token::Limit)? {
349                    Some(self.parse_integer()? as u64)
350                } else {
351                    None
352                };
353                return Ok(QueryExpr::QueueSelect(QueueSelectQuery {
354                    queue,
355                    columns: queue_projection_columns(&columns)?,
356                    filter,
357                    limit,
358                }));
359            } else if self.consume(&Token::Star)? {
360                "*".to_string()
361            } else if self.consume(&Token::All)? {
362                "all".to_string()
363            } else if matches!(self.peek(), Token::Components) {
364                // `components` lexes as the graph-analytics keyword
365                // `Token::Components`, so it never reaches `expect_ident`.
366                // In FROM position it is the connected-components
367                // table-valued function: `FROM components(g)` (issue #795).
368                self.advance()?; // consume COMPONENTS
369                let name = "components".to_string();
370                self.expect(Token::LParen)?;
371                let (args, named_args, subquery_args) = self.parse_table_function_args(&name)?;
372                self.expect(Token::RParen)?;
373                table_source = Some(self.build_table_function_source(
374                    name.clone(),
375                    args,
376                    named_args,
377                    subquery_args,
378                )?);
379                name
380            } else if matches!(self.peek(), Token::ShortestPath) {
381                // `shortest_path` lexes as the graph-analytics keyword
382                // `Token::ShortestPath`, so it never reaches `expect_ident`.
383                // In FROM position it is the shortest-path table-valued
384                // function: `FROM shortest_path(g, src => .., dst => ..)`
385                // (issue #798). The bare `SHORTEST_PATH ... FROM ... TO ...`
386                // graph-command form is dispatched separately at statement
387                // start, so the two grammars never collide.
388                self.advance()?; // consume SHORTEST_PATH
389                let name = "shortest_path".to_string();
390                self.expect(Token::LParen)?;
391                let (args, named_args, subquery_args) = self.parse_table_function_args(&name)?;
392                self.expect(Token::RParen)?;
393                table_source = Some(self.build_table_function_source(
394                    name.clone(),
395                    args,
396                    named_args,
397                    subquery_args,
398                )?);
399                name
400            } else {
401                let ident = self.expect_ident()?;
402                // Table-valued function call: `ident(arg, ...)` (issue #795).
403                if matches!(self.peek(), Token::LParen) {
404                    self.advance()?; // consume '('
405                    let (args, named_args, subquery_args) =
406                        self.parse_table_function_args(&ident)?;
407                    self.expect(Token::RParen)?;
408                    table_source = Some(self.build_table_function_source(
409                        ident.clone(),
410                        args,
411                        named_args,
412                        subquery_args,
413                    )?);
414                    ident
415                } else {
416                    // Dotted table name, e.g. the `<graph>.<output>` analytics
417                    // virtual view `g.communities` (issue #800) or a schema-
418                    // qualified virtual table such as `red.collections`. The
419                    // dotted form is kept verbatim in `table`; the runtime
420                    // resolves a real collection of that exact name first, then
421                    // falls back to analytics-view resolution.
422                    let mut name = ident;
423                    while matches!(self.peek(), Token::Dot) {
424                        self.advance()?; // consume '.'
425                        let segment = self.parse_table_name_segment()?;
426                        name.push('.');
427                        name.push_str(&segment);
428                    }
429                    name
430                }
431            }
432        } else {
433            "any".to_string()
434        };
435
436        // Parse optional alias (only when a FROM clause exists).
437        // `AS OF` is a clause — don't gobble the `AS` as an alias
438        // marker when the following token is `OF`.
439        let alias =
440            if !has_from || (self.check(&Token::As) && matches!(self.peek_next()?, Token::Of)) {
441                None
442            } else if self.consume(&Token::As)?
443                || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
444            {
445                Some(self.expect_ident()?)
446            } else {
447                None
448            };
449
450        let mut query = TableQuery {
451            table,
452            source: table_source,
453            alias,
454            select_items,
455            columns,
456            where_expr: None,
457            filter: None,
458            group_by_exprs: Vec::new(),
459            group_by: Vec::new(),
460            having_expr: None,
461            having: None,
462            order_by: Vec::new(),
463            limit: None,
464            limit_param: None,
465            offset: None,
466            offset_param: None,
467            expand: None,
468            as_of: None,
469            sessionize: None,
470        };
471
472        if self.is_join_keyword() {
473            let return_items = std::mem::take(&mut query.select_items);
474            let return_ = std::mem::take(&mut query.columns);
475            let mut expr = self.parse_join_query(QueryExpr::Table(query))?;
476            if let QueryExpr::Join(join) = &mut expr {
477                join.return_items = return_items;
478                join.return_ = return_;
479            }
480            return Ok(expr);
481        }
482
483        // SESSIONIZE BY <ident> GAP <duration> [ORDER BY <ident>]
484        // — issue #585 slice 8. Parsed before WHERE/GROUP BY so the
485        // optional inner ORDER BY (which the user binds to the
486        // operator's timestamp axis) cannot be confused with the
487        // SELECT's top-level ORDER BY further down. Both `BY` and
488        // `GAP` may be omitted when the source collection's
489        // descriptor carries `SESSION_KEY` / `SESSION_GAP` defaults
490        // (slice 1) — the executor resolves them at run time and
491        // raises `MissingSessionKey` if neither side supplies a
492        // value.
493        if self.consume(&Token::Sessionize)? {
494            query.sessionize = Some(self.parse_sessionize_clause()?);
495        }
496
497        // Parse optional clauses
498        self.parse_table_clauses(&mut query)?;
499
500        Ok(QueryExpr::Table(query))
501    }
502
503    fn parse_sessionize_clause(
504        &mut self,
505    ) -> Result<crate::storage::query::ast::SessionizeClause, ParseError> {
506        use crate::storage::query::ast::SessionizeClause;
507
508        let mut clause = SessionizeClause::default();
509
510        if self.consume(&Token::By)? {
511            clause.actor_col = Some(self.expect_ident()?);
512        }
513        if self.consume(&Token::Gap)? {
514            let value = self.parse_float()?;
515            let unit = self.parse_duration_unit()?;
516            clause.gap_ms = Some((value * unit) as u64);
517        }
518        // Optional `ORDER BY <ident>` immediately after GAP. The
519        // top-level SELECT ORDER BY parsed by `parse_table_clauses`
520        // sees the next ORDER token, so this only consumes the one
521        // immediately attached to SESSIONIZE.
522        if self.consume(&Token::Order)? {
523            self.expect(Token::By)?;
524            clause.order_col = Some(self.expect_ident()?);
525        }
526        Ok(clause)
527    }
528}
529
530impl<'a> Parser<'a> {
531    /// Check if current identifier is a clause keyword
532    pub fn is_clause_keyword(&self) -> bool {
533        matches!(
534            self.peek(),
535            Token::Where
536                | Token::Order
537                | Token::Limit
538                | Token::Offset
539                | Token::Join
540                | Token::Inner
541                | Token::Left
542                | Token::Right
543                | Token::As
544                | Token::Sessionize
545        )
546    }
547
548    /// Parse projection list (column selections)
549    pub fn parse_projection_list(&mut self) -> Result<Vec<Projection>, ParseError> {
550        Ok(self.parse_select_items_and_projections()?.1)
551    }
552
553    pub(crate) fn parse_select_items_and_projections(
554        &mut self,
555    ) -> Result<(Vec<SelectItem>, Vec<Projection>), ParseError> {
556        // Handle SELECT *
557        if self.consume(&Token::Star)? {
558            return Ok((vec![SelectItem::Wildcard], Vec::new())); // Empty legacy vec means all columns
559        }
560
561        let mut select_items = Vec::new();
562        let mut projections = Vec::new();
563        loop {
564            let (item, proj) = self.parse_projection()?;
565            select_items.push(item);
566            projections.push(proj);
567
568            if !self.consume(&Token::Comma)? {
569                break;
570            }
571        }
572        Ok((select_items, projections))
573    }
574
575    /// Parse a single projection — supports columns, aggregate functions, and scalar functions
576    fn parse_projection(&mut self) -> Result<(SelectItem, Projection), ParseError> {
577        let expr = self.parse_expr()?;
578        if contains_nested_aggregate(&expr) && !is_plain_aggregate_expr(&expr) {
579            return Err(ParseError::new(
580                "aggregate function is not valid inside another expression".to_string(),
581                self.position(),
582            ));
583        }
584        let alias = if self.consume(&Token::As)? {
585            Some(self.expect_column_ident()?)
586        } else {
587            None
588        };
589        let select_item = SelectItem::Expr {
590            expr: expr.clone(),
591            alias: alias.clone(),
592        };
593        let projection = select_item_to_projection(&select_item).ok_or_else(|| {
594            ParseError::new(
595                "projection cannot yet be lowered to legacy runtime representation".to_string(),
596                self.position(),
597            )
598        })?;
599        Ok((select_item, projection))
600    }
601}
602
603fn contains_nested_aggregate(expr: &Expr) -> bool {
604    match expr {
605        Expr::FunctionCall { name, args, .. } => {
606            is_aggregate_function(&name.to_uppercase())
607                || args.iter().any(contains_nested_aggregate)
608        }
609        // Issue #589 slice 7a: a window function aggregate (e.g.
610        // `SUM(x) OVER (...)`) is NOT a plain aggregate from the
611        // group-by analyser's point of view — it operates over a
612        // partitioned window, not a GROUP BY group. We still recurse
613        // into args / partition / order keys so a *nested* aggregate
614        // (e.g. `SUM(COUNT(*) OVER ()) OVER (...)`) is caught.
615        Expr::WindowFunctionCall { args, window, .. } => {
616            args.iter().any(contains_nested_aggregate)
617                || window.partition_by.iter().any(contains_nested_aggregate)
618                || window
619                    .order_by
620                    .iter()
621                    .any(|o| contains_nested_aggregate(&o.expr))
622        }
623        Expr::BinaryOp { lhs, rhs, .. } => {
624            contains_nested_aggregate(lhs) || contains_nested_aggregate(rhs)
625        }
626        Expr::UnaryOp { operand, .. } | Expr::IsNull { operand, .. } => {
627            contains_nested_aggregate(operand)
628        }
629        Expr::Cast { inner, .. } => contains_nested_aggregate(inner),
630        Expr::Case {
631            branches, else_, ..
632        } => {
633            branches.iter().any(|(cond, value)| {
634                contains_nested_aggregate(cond) || contains_nested_aggregate(value)
635            }) || else_.as_deref().is_some_and(contains_nested_aggregate)
636        }
637        Expr::InList { target, values, .. } => {
638            contains_nested_aggregate(target) || values.iter().any(contains_nested_aggregate)
639        }
640        Expr::Between {
641            target, low, high, ..
642        } => {
643            contains_nested_aggregate(target)
644                || contains_nested_aggregate(low)
645                || contains_nested_aggregate(high)
646        }
647        Expr::Literal { .. }
648        | Expr::Column { .. }
649        | Expr::Parameter { .. }
650        | Expr::Subquery { .. } => false,
651    }
652}
653
654fn is_plain_aggregate_expr(expr: &Expr) -> bool {
655    match expr {
656        Expr::FunctionCall { name, args, .. } if is_aggregate_function(&name.to_uppercase()) => {
657            !args.iter().any(contains_nested_aggregate)
658        }
659        _ => false,
660    }
661}
662
663fn attach_projection_alias(proj: Projection, alias: Option<String>) -> Projection {
664    let Some(alias) = alias else { return proj };
665    match proj {
666        Projection::Field(field, _) => Projection::Field(field, Some(alias)),
667        Projection::Expression(filter, _) => Projection::Expression(filter, Some(alias)),
668        Projection::Function(name, args) => {
669            if name.contains(':') {
670                Projection::Function(name, args)
671            } else {
672                Projection::Function(format!("{name}:{alias}"), args)
673            }
674        }
675        Projection::Column(column) => Projection::Alias(column, alias),
676        Projection::Window {
677            name, args, window, ..
678        } => Projection::Window {
679            name,
680            args,
681            window,
682            alias: Some(alias),
683        },
684        other => other,
685    }
686}
687
688fn queue_projection_columns(columns: &[Projection]) -> Result<Vec<String>, ParseError> {
689    let mut out = Vec::new();
690    for column in columns {
691        match column {
692            Projection::Column(name) => out.push(name.clone()),
693            Projection::Alias(name, _) => out.push(name.clone()),
694            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
695                out.push(column.clone());
696            }
697            Projection::All => return Ok(Vec::new()),
698            other => {
699                return Err(ParseError::new(
700                    format!(
701                        "unsupported SELECT FROM QUEUE projection {other:?}; use `SELECT *` or bare column names, or use queue verbs (PUSH, POP, PEEK, LEN, ACK, NACK, …) for queue operations"
702                    ),
703                    crate::storage::query::lexer::Position::default(),
704                ));
705            }
706        }
707    }
708    Ok(out)
709}
710
711impl<'a> Parser<'a> {
712    /// Parse table query clauses (AS OF, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET)
713    pub fn parse_table_clauses(&mut self, query: &mut TableQuery) -> Result<(), ParseError> {
714        // AS OF clause — time-travel anchor. Must come before WHERE
715        // so the executor can bind the snapshot before filter eval.
716        if self.check(&Token::As) {
717            let next_is_of = matches!(self.peek_next()?, Token::Of);
718            if next_is_of {
719                self.expect(Token::As)?;
720                self.expect(Token::Of)?;
721                query.as_of = Some(self.parse_as_of_spec()?);
722            }
723        }
724
725        // WHERE clause
726        if self.consume(&Token::Where)? {
727            let filter = self.parse_filter()?;
728            query.where_expr = Some(filter_to_expr(&filter));
729            query.filter = Some(filter);
730        }
731
732        // GROUP BY clause
733        if self.consume(&Token::Group)? {
734            self.expect(Token::By)?;
735            let (group_by_exprs, group_by) = self.parse_group_by_items()?;
736            query.group_by_exprs = group_by_exprs;
737            query.group_by = group_by;
738        }
739
740        // HAVING clause (only valid after GROUP BY)
741        if !query.group_by_exprs.is_empty() && self.consume_ident_ci("HAVING")? {
742            let having = self.parse_filter()?;
743            query.having_expr = Some(filter_to_expr(&having));
744            query.having = Some(having);
745        }
746
747        // ORDER BY clause
748        if self.consume(&Token::Order)? {
749            self.expect(Token::By)?;
750            query.order_by = self.parse_order_by_list()?;
751        }
752
753        // LIMIT clause
754        if self.consume(&Token::Limit)? {
755            if matches!(self.peek(), Token::Dollar | Token::Question) {
756                query.limit_param = Some(self.parse_param_slot("LIMIT")?);
757                query.limit = None;
758            } else {
759                query.limit = Some(self.parse_integer()? as u64);
760            }
761        }
762
763        // OFFSET clause
764        if self.consume(&Token::Offset)? {
765            if matches!(self.peek(), Token::Dollar | Token::Question) {
766                query.offset_param = Some(self.parse_param_slot("OFFSET")?);
767                query.offset = None;
768            } else {
769                query.offset = Some(self.parse_integer()? as u64);
770            }
771        }
772
773        // WITH EXPAND clause
774        if self.consume(&Token::With)? && self.consume_ident_ci("EXPAND")? {
775            query.expand = Some(self.parse_expand_options()?);
776        }
777
778        Ok(())
779    }
780
781    /// Parse an AS OF spec after `AS OF` has already been consumed.
782    /// Grammar:
783    ///   AS OF COMMIT   '<hex>'
784    ///   AS OF BRANCH   '<name>'
785    ///   AS OF TAG      '<name>'
786    ///   AS OF TIMESTAMP <integer-ms>
787    ///   AS OF SNAPSHOT  <xid>
788    fn parse_as_of_spec(&mut self) -> Result<crate::storage::query::ast::AsOfClause, ParseError> {
789        use crate::storage::query::ast::AsOfClause;
790
791        // Keyword — accept both tokenized forms (e.g. Token::Commit
792        // if present) and bare identifiers for flexibility.
793        let keyword = match self.peek() {
794            Token::Ident(s) => {
795                let s = s.to_ascii_uppercase();
796                self.advance()?;
797                s
798            }
799            Token::Commit => {
800                self.advance()?;
801                "COMMIT".to_string()
802            }
803            other => {
804                return Err(ParseError::expected(
805                    vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
806                    other,
807                    self.position(),
808                ));
809            }
810        };
811
812        match keyword.as_str() {
813            "COMMIT" => {
814                let value = self.parse_string()?;
815                Ok(AsOfClause::Commit(value))
816            }
817            "BRANCH" => {
818                let value = self.parse_string()?;
819                Ok(AsOfClause::Branch(value))
820            }
821            "TAG" => {
822                let value = self.parse_string()?;
823                Ok(AsOfClause::Tag(value))
824            }
825            "TIMESTAMP" => {
826                let value = self.parse_integer()?;
827                Ok(AsOfClause::TimestampMs(value))
828            }
829            "SNAPSHOT" => {
830                let value = self.parse_integer()?;
831                if value < 0 {
832                    return Err(ParseError::new(
833                        "AS OF SNAPSHOT requires non-negative xid".to_string(),
834                        self.position(),
835                    ));
836                }
837                Ok(AsOfClause::Snapshot(value as u64))
838            }
839            other => Err(ParseError::expected(
840                vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
841                &Token::Ident(other.into()),
842                self.position(),
843            )),
844        }
845    }
846
847    /// Parse EXPAND options: GRAPH [DEPTH n], CROSS_REFS, ALL
848    fn parse_expand_options(
849        &mut self,
850    ) -> Result<crate::storage::query::ast::ExpandOptions, ParseError> {
851        use crate::storage::query::ast::ExpandOptions;
852        let mut opts = ExpandOptions::default();
853
854        loop {
855            if self.consume(&Token::Graph)? || self.consume_ident_ci("GRAPH")? {
856                opts.graph = true;
857                opts.graph_depth = if self.consume(&Token::Depth)? {
858                    self.parse_integer()? as usize
859                } else {
860                    1
861                };
862            } else if self.consume_ident_ci("CROSS_REFS")?
863                || self.consume_ident_ci("CROSSREFS")?
864                || self.consume_ident_ci("REFS")?
865            {
866                opts.cross_refs = true;
867            } else if self.consume(&Token::All)? || self.consume_ident_ci("ALL")? {
868                opts.graph = true;
869                opts.cross_refs = true;
870                opts.graph_depth = 1;
871            } else {
872                break;
873            }
874            if !self.consume(&Token::Comma)? {
875                break;
876            }
877        }
878
879        if !opts.graph && !opts.cross_refs {
880            opts.graph = true;
881            opts.cross_refs = true;
882            opts.graph_depth = 1;
883        }
884
885        Ok(opts)
886    }
887
888    /// Parse GROUP BY field list
889    pub fn parse_group_by_list(&mut self) -> Result<Vec<String>, ParseError> {
890        Ok(self.parse_group_by_items()?.1)
891    }
892
893    fn parse_group_by_items(&mut self) -> Result<(Vec<Expr>, Vec<String>), ParseError> {
894        let mut exprs = Vec::new();
895        let mut fields = Vec::new();
896        loop {
897            let expr = self.parse_expr()?;
898            let rendered = render_group_by_expr(&expr).ok_or_else(|| {
899                ParseError::new(
900                    "GROUP BY expression cannot yet be lowered to legacy runtime representation"
901                        .to_string(),
902                    self.position(),
903                )
904            })?;
905            exprs.push(expr);
906            fields.push(rendered);
907            if !self.consume(&Token::Comma)? {
908                break;
909            }
910        }
911        Ok((exprs, fields))
912    }
913
914    /// Parse ORDER BY list.
915    ///
916    /// Fase 1.6 unlock: uses the new `Expr` Pratt parser so
917    /// `ORDER BY CAST(age AS INT)`, `ORDER BY a + b * 2`,
918    /// `ORDER BY last_seen - created_at` all parse cleanly. If the
919    /// parsed expression is a bare `Column`, we store it in the
920    /// legacy `field` slot and leave `expr` None so downstream
921    /// consumers (planner cost, mode translators) keep using the
922    /// fast path. Otherwise we stash the full tree in `expr` and
923    /// populate `field` with a synthetic marker that runtime code
924    /// never touches.
925    pub fn parse_order_by_list(&mut self) -> Result<Vec<OrderByClause>, ParseError> {
926        use super::super::ast::Expr as AstExpr;
927        let mut clauses = Vec::new();
928        loop {
929            let parsed = self.parse_expr()?;
930            let (field, expr_slot) = match parsed {
931                AstExpr::Column { field, .. } => (field, None),
932                other => (
933                    // Synthetic placeholder so legacy pattern-matches
934                    // on `OrderByClause.field` still destructure.
935                    // Runtime comparators check `expr` first when set,
936                    // so the sentinel never gets resolved against a
937                    // real record.
938                    FieldRef::TableColumn {
939                        table: String::new(),
940                        column: String::new(),
941                    },
942                    Some(other),
943                ),
944            };
945
946            let ascending = if self.consume(&Token::Desc)? {
947                false
948            } else {
949                self.consume(&Token::Asc)?;
950                true
951            };
952
953            let nulls_first = if self.consume(&Token::Nulls)? {
954                if self.consume(&Token::First)? {
955                    true
956                } else {
957                    self.expect(Token::Last)?;
958                    false
959                }
960            } else {
961                !ascending // Default: nulls last for ASC, first for DESC
962            };
963
964            clauses.push(OrderByClause {
965                field,
966                expr: expr_slot,
967                ascending,
968                nulls_first,
969            });
970
971            if !self.consume(&Token::Comma)? {
972                break;
973            }
974        }
975        Ok(clauses)
976    }
977
978    fn parse_function_literal_arg(&mut self) -> Result<String, ParseError> {
979        let negative = self.consume(&Token::Dash)?;
980        let mut literal = match self.advance()? {
981            Token::Integer(n) => {
982                if negative {
983                    format!("-{n}")
984                } else {
985                    n.to_string()
986                }
987            }
988            Token::Float(n) => {
989                let value = if negative { -n } else { n };
990                if value.fract().abs() < f64::EPSILON {
991                    format!("{}", value as i64)
992                } else {
993                    value.to_string()
994                }
995            }
996            other => {
997                return Err(ParseError::new(
998                    // F-05: `other` is a `Token` whose Display arms emit raw
999                    // user bytes for `Ident` / `String` / `JsonLiteral`.
1000                    // Render via `{:?}` so CR/LF/NUL/quotes are escaped
1001                    // before the message reaches downstream serialization
1002                    // sinks.
1003                    format!("expected number, got {:?}", other),
1004                    self.position(),
1005                ));
1006            }
1007        };
1008
1009        if let Token::Ident(unit) = self.peek().clone() {
1010            if is_duration_unit(&unit) {
1011                self.advance()?;
1012                literal.push_str(&unit.to_ascii_lowercase());
1013            }
1014        }
1015
1016        Ok(literal)
1017    }
1018}
1019
1020fn is_duration_unit(unit: &str) -> bool {
1021    matches!(
1022        unit.to_ascii_lowercase().as_str(),
1023        "ms" | "msec"
1024            | "millisecond"
1025            | "milliseconds"
1026            | "s"
1027            | "sec"
1028            | "secs"
1029            | "second"
1030            | "seconds"
1031            | "m"
1032            | "min"
1033            | "mins"
1034            | "minute"
1035            | "minutes"
1036            | "h"
1037            | "hr"
1038            | "hrs"
1039            | "hour"
1040            | "hours"
1041            | "d"
1042            | "day"
1043            | "days"
1044    )
1045}
1046
1047fn render_group_by_expr(expr: &Expr) -> Option<String> {
1048    match expr {
1049        Expr::Column { field, .. } => match field {
1050            FieldRef::TableColumn { table, column } if table.is_empty() => Some(column.clone()),
1051            FieldRef::TableColumn { table, column } => Some(format!("{table}.{column}")),
1052            other => Some(format!("{other:?}")),
1053        },
1054        Expr::FunctionCall { name, args, .. } if name.eq_ignore_ascii_case("TIME_BUCKET") => {
1055            let rendered = args
1056                .iter()
1057                .map(render_group_by_expr)
1058                .collect::<Option<Vec<_>>>()?;
1059            Some(format!("TIME_BUCKET({})", rendered.join(",")))
1060        }
1061        Expr::Literal { value, .. } => Some(match value {
1062            Value::Null => String::new(),
1063            Value::Text(text) => text.to_string(),
1064            other => other.to_string(),
1065        }),
1066        _ => expr_to_projection(expr).map(|projection| match projection {
1067            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
1068                column
1069            }
1070            Projection::Field(FieldRef::TableColumn { table, column }, _) => {
1071                format!("{table}.{column}")
1072            }
1073            Projection::Function(name, args) => {
1074                let rendered = args
1075                    .iter()
1076                    .map(render_group_by_function_arg)
1077                    .collect::<Option<Vec<_>>>()
1078                    .unwrap_or_default();
1079                format!(
1080                    "{}({})",
1081                    name.split(':').next().unwrap_or(&name),
1082                    rendered.join(",")
1083                )
1084            }
1085            Projection::Column(column) | Projection::Alias(column, _) => column,
1086            Projection::All => "*".to_string(),
1087            Projection::Expression(_, _) => "expr".to_string(),
1088            Projection::Field(other, _) => format!("{other:?}"),
1089            Projection::Window { name, .. } => name,
1090        }),
1091    }
1092}
1093
1094fn render_group_by_function_arg(arg: &Projection) -> Option<String> {
1095    match arg {
1096        Projection::Column(col) => Some(
1097            col.strip_prefix("LIT:")
1098                .map(str::to_string)
1099                .unwrap_or_else(|| col.clone()),
1100        ),
1101        Projection::All => Some("*".to_string()),
1102        _ => None,
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::storage::query::ast::{AsOfClause, BinOp, CompareOp, ExpandOptions, TableSource};
1110
1111    fn parse_table(sql: &str) -> TableQuery {
1112        let parsed = super::super::parse(sql).unwrap().query;
1113        let QueryExpr::Table(table) = parsed else {
1114            panic!("expected table query");
1115        };
1116        table
1117    }
1118
1119    fn col(name: &str) -> Expr {
1120        Expr::Column {
1121            field: FieldRef::TableColumn {
1122                table: String::new(),
1123                column: name.to_string(),
1124            },
1125            span: Span::synthetic(),
1126        }
1127    }
1128
1129    #[test]
1130    fn helper_function_catalogs_cover_all_names() {
1131        for name in [
1132            "GEO_DISTANCE",
1133            "GEO_DISTANCE_VINCENTY",
1134            "GEO_BEARING",
1135            "GEO_MIDPOINT",
1136            "HAVERSINE",
1137            "VINCENTY",
1138            "TIME_BUCKET",
1139            "UPPER",
1140            "LOWER",
1141            "LENGTH",
1142            "CHAR_LENGTH",
1143            "CHARACTER_LENGTH",
1144            "OCTET_LENGTH",
1145            "BIT_LENGTH",
1146            "SUBSTRING",
1147            "SUBSTR",
1148            "POSITION",
1149            "TRIM",
1150            "LTRIM",
1151            "RTRIM",
1152            "BTRIM",
1153            "CONCAT",
1154            "CONCAT_WS",
1155            "REVERSE",
1156            "LEFT",
1157            "RIGHT",
1158            "QUOTE_LITERAL",
1159            "ABS",
1160            "ROUND",
1161            "COALESCE",
1162            "STDDEV",
1163            "VARIANCE",
1164            "MEDIAN",
1165            "PERCENTILE",
1166            "GROUP_CONCAT",
1167            "STRING_AGG",
1168            "FIRST",
1169            "LAST",
1170            "ARRAY_AGG",
1171            "COUNT_DISTINCT",
1172            "MONEY",
1173            "MONEY_ASSET",
1174            "MONEY_MINOR",
1175            "MONEY_SCALE",
1176            "VERIFY_PASSWORD",
1177            "CAST",
1178            "CASE",
1179        ] {
1180            assert!(is_scalar_function(name), "{name}");
1181        }
1182        assert!(!is_scalar_function("NOT_A_FUNCTION"));
1183
1184        for name in [
1185            "COUNT",
1186            "AVG",
1187            "SUM",
1188            "MIN",
1189            "MAX",
1190            "STDDEV",
1191            "VARIANCE",
1192            "MEDIAN",
1193            "PERCENTILE",
1194            "GROUP_CONCAT",
1195            "STRING_AGG",
1196            "FIRST",
1197            "LAST",
1198            "ARRAY_AGG",
1199            "COUNT_DISTINCT",
1200        ] {
1201            assert!(is_aggregate_function(name), "{name}");
1202        }
1203        assert!(!is_aggregate_function("LOWER"));
1204
1205        assert_eq!(aggregate_token_name(&Token::Count), Some("COUNT"));
1206        assert_eq!(aggregate_token_name(&Token::Sum), Some("SUM"));
1207        assert_eq!(aggregate_token_name(&Token::Avg), Some("AVG"));
1208        assert_eq!(aggregate_token_name(&Token::Min), Some("MIN"));
1209        assert_eq!(aggregate_token_name(&Token::Max), Some("MAX"));
1210        assert_eq!(aggregate_token_name(&Token::First), Some("FIRST"));
1211        assert_eq!(aggregate_token_name(&Token::Last), Some("LAST"));
1212        assert_eq!(aggregate_token_name(&Token::Ident("COUNT".into())), None);
1213
1214        assert_eq!(scalar_token_name(&Token::Left), Some("LEFT"));
1215        assert_eq!(scalar_token_name(&Token::Right), Some("RIGHT"));
1216        assert_eq!(scalar_token_name(&Token::Ident("LEFT".into())), None);
1217
1218        for unit in [
1219            "ms",
1220            "msec",
1221            "millisecond",
1222            "milliseconds",
1223            "s",
1224            "sec",
1225            "secs",
1226            "second",
1227            "seconds",
1228            "m",
1229            "min",
1230            "mins",
1231            "minute",
1232            "minutes",
1233            "h",
1234            "hr",
1235            "hrs",
1236            "hour",
1237            "hours",
1238            "d",
1239            "day",
1240            "days",
1241        ] {
1242            assert!(is_duration_unit(unit), "{unit}");
1243        }
1244        assert!(!is_duration_unit("fortnight"));
1245    }
1246
1247    #[test]
1248    fn projection_and_group_render_helpers_cover_aliases_and_exprs() {
1249        let field = FieldRef::TableColumn {
1250            table: String::new(),
1251            column: "name".into(),
1252        };
1253        let filter = Filter::Compare {
1254            field: field.clone(),
1255            op: CompareOp::Eq,
1256            value: Value::text("alice"),
1257        };
1258
1259        assert_eq!(
1260            attach_projection_alias(Projection::Field(field.clone(), None), Some("n".into())),
1261            Projection::Field(field.clone(), Some("n".into()))
1262        );
1263        assert_eq!(
1264            attach_projection_alias(
1265                Projection::Expression(Box::new(filter.clone()), None),
1266                Some("ok".into())
1267            ),
1268            Projection::Expression(Box::new(filter), Some("ok".into()))
1269        );
1270        assert_eq!(
1271            attach_projection_alias(
1272                Projection::Function("LOWER".into(), vec![]),
1273                Some("l".into())
1274            ),
1275            Projection::Function("LOWER:l".into(), vec![])
1276        );
1277        assert_eq!(
1278            attach_projection_alias(
1279                Projection::Function("LOWER:l".into(), vec![]),
1280                Some("ignored".into())
1281            ),
1282            Projection::Function("LOWER:l".into(), vec![])
1283        );
1284        assert_eq!(
1285            attach_projection_alias(Projection::Column("name".into()), Some("n".into())),
1286            Projection::Alias("name".into(), "n".into())
1287        );
1288        assert_eq!(
1289            attach_projection_alias(Projection::All, Some("ignored".into())),
1290            Projection::All
1291        );
1292
1293        assert_eq!(render_group_by_expr(&col("dept")).as_deref(), Some("dept"));
1294        assert_eq!(
1295            render_group_by_expr(&Expr::Column {
1296                field: FieldRef::TableColumn {
1297                    table: "employees".into(),
1298                    column: "dept".into()
1299                },
1300                span: Span::synthetic()
1301            })
1302            .as_deref(),
1303            Some("employees.dept")
1304        );
1305        assert_eq!(
1306            render_group_by_expr(&Expr::Column {
1307                field: FieldRef::NodeId { alias: "n".into() },
1308                span: Span::synthetic()
1309            }),
1310            Some("NodeId { alias: \"n\" }".into())
1311        );
1312        assert_eq!(
1313            render_group_by_expr(&Expr::Literal {
1314                value: Value::Null,
1315                span: Span::synthetic()
1316            })
1317            .as_deref(),
1318            Some("")
1319        );
1320        assert_eq!(
1321            render_group_by_expr(&Expr::Literal {
1322                value: Value::text("5m"),
1323                span: Span::synthetic()
1324            })
1325            .as_deref(),
1326            Some("5m")
1327        );
1328        assert_eq!(
1329            render_group_by_expr(&Expr::Literal {
1330                value: Value::Integer(7),
1331                span: Span::synthetic()
1332            })
1333            .as_deref(),
1334            Some("7")
1335        );
1336        assert_eq!(
1337            render_group_by_expr(&Expr::FunctionCall {
1338                name: "TIME_BUCKET".into(),
1339                args: vec![
1340                    col("ts"),
1341                    Expr::Literal {
1342                        value: Value::text("5m"),
1343                        span: Span::synthetic()
1344                    }
1345                ],
1346                span: Span::synthetic()
1347            })
1348            .as_deref(),
1349            Some("TIME_BUCKET(ts,5m)")
1350        );
1351        assert_eq!(
1352            render_group_by_expr(&Expr::FunctionCall {
1353                name: "LOWER".into(),
1354                args: vec![col("dept")],
1355                span: Span::synthetic()
1356            })
1357            .as_deref(),
1358            Some("LOWER()")
1359        );
1360
1361        assert_eq!(
1362            render_group_by_function_arg(&Projection::Column("LIT:5m".into())),
1363            Some("5m".into())
1364        );
1365        assert_eq!(
1366            render_group_by_function_arg(&Projection::Column("dept".into())),
1367            Some("dept".into())
1368        );
1369        assert_eq!(
1370            render_group_by_function_arg(&Projection::All),
1371            Some("*".into())
1372        );
1373        assert_eq!(
1374            render_group_by_function_arg(&Projection::Function("LOWER".into(), vec![])),
1375            None
1376        );
1377    }
1378
1379    #[test]
1380    fn expression_aggregate_detection_branches() {
1381        let count = Expr::FunctionCall {
1382            name: "COUNT".into(),
1383            args: vec![col("id")],
1384            span: Span::synthetic(),
1385        };
1386        assert!(contains_nested_aggregate(&count));
1387        assert!(is_plain_aggregate_expr(&count));
1388
1389        let nested = Expr::FunctionCall {
1390            name: "SUM".into(),
1391            args: vec![count.clone()],
1392            span: Span::synthetic(),
1393        };
1394        assert!(contains_nested_aggregate(&nested));
1395        assert!(!is_plain_aggregate_expr(&nested));
1396
1397        let binary = Expr::BinaryOp {
1398            op: BinOp::Add,
1399            lhs: Box::new(col("a")),
1400            rhs: Box::new(count.clone()),
1401            span: Span::synthetic(),
1402        };
1403        assert!(contains_nested_aggregate(&binary));
1404
1405        let unary = Expr::UnaryOp {
1406            op: UnaryOp::Not,
1407            operand: Box::new(count.clone()),
1408            span: Span::synthetic(),
1409        };
1410        assert!(contains_nested_aggregate(&unary));
1411
1412        let cast = Expr::Cast {
1413            inner: Box::new(count.clone()),
1414            target: crate::storage::schema::DataType::Integer,
1415            span: Span::synthetic(),
1416        };
1417        assert!(contains_nested_aggregate(&cast));
1418
1419        let case = Expr::Case {
1420            branches: vec![(col("flag"), count.clone())],
1421            else_: Some(Box::new(col("fallback"))),
1422            span: Span::synthetic(),
1423        };
1424        assert!(contains_nested_aggregate(&case));
1425
1426        let in_list = Expr::InList {
1427            target: Box::new(col("id")),
1428            values: vec![count.clone()],
1429            negated: false,
1430            span: Span::synthetic(),
1431        };
1432        assert!(contains_nested_aggregate(&in_list));
1433
1434        let between = Expr::Between {
1435            target: Box::new(col("id")),
1436            low: Box::new(col("low")),
1437            high: Box::new(count),
1438            negated: false,
1439            span: Span::synthetic(),
1440        };
1441        assert!(contains_nested_aggregate(&between));
1442        assert!(!contains_nested_aggregate(&Expr::Parameter {
1443            index: 1,
1444            span: Span::synthetic()
1445        }));
1446
1447        assert!(super::super::parse("SELECT SUM(COUNT(id)) FROM t").is_err());
1448    }
1449
1450    #[test]
1451    fn table_clause_parsing_covers_as_of_order_offset_and_expand() {
1452        let table = parse_table(
1453            "SELECT name FROM users AS OF COMMIT 'abc123' \
1454             WHERE deleted_at IS NULL \
1455             ORDER BY LOWER(name) ASC NULLS FIRST, created_at DESC NULLS LAST \
1456             LIMIT 10 OFFSET 5 WITH EXPAND GRAPH DEPTH 3, CROSS_REFS",
1457        );
1458        assert!(matches!(table.as_of, Some(AsOfClause::Commit(ref v)) if v == "abc123"));
1459        assert!(table.filter.is_some());
1460        assert_eq!(table.order_by.len(), 2);
1461        assert!(table.order_by[0].expr.is_some());
1462        assert!(table.order_by[0].ascending);
1463        assert!(table.order_by[0].nulls_first);
1464        assert!(!table.order_by[1].ascending);
1465        assert!(!table.order_by[1].nulls_first);
1466        assert_eq!(table.limit, Some(10));
1467        assert_eq!(table.offset, Some(5));
1468        assert!(matches!(
1469            table.expand,
1470            Some(ExpandOptions {
1471                graph: true,
1472                graph_depth: 3,
1473                cross_refs: true,
1474                ..
1475            })
1476        ));
1477
1478        let table = parse_table("SELECT * FROM users AS OF BRANCH 'main'");
1479        assert!(matches!(table.as_of, Some(AsOfClause::Branch(ref v)) if v == "main"));
1480
1481        let table = parse_table("SELECT * FROM users AS OF TAG 'v1'");
1482        assert!(matches!(table.as_of, Some(AsOfClause::Tag(ref v)) if v == "v1"));
1483
1484        let table = parse_table("SELECT * FROM users AS OF TIMESTAMP 1710000000000");
1485        assert!(matches!(
1486            table.as_of,
1487            Some(AsOfClause::TimestampMs(1_710_000_000_000))
1488        ));
1489
1490        let table = parse_table("SELECT * FROM users AS OF SNAPSHOT 42");
1491        assert!(matches!(table.as_of, Some(AsOfClause::Snapshot(42))));
1492
1493        let table = parse_table("SELECT * FROM users WITH EXPAND");
1494        assert!(matches!(
1495            table.expand,
1496            Some(ExpandOptions {
1497                graph: true,
1498                graph_depth: 1,
1499                cross_refs: true,
1500                ..
1501            })
1502        ));
1503
1504        assert!(super::super::parse("SELECT * FROM users AS OF SNAPSHOT -1").is_err());
1505        assert!(super::super::parse("SELECT * FROM users AS OF UNKNOWN 'x'").is_err());
1506    }
1507
1508    #[test]
1509    fn direct_parser_helpers_cover_projection_group_order_and_literals() {
1510        let mut parser = Parser::new("name, LOWER(email) AS email_l").unwrap();
1511        let projections = parser.parse_projection_list().unwrap();
1512        assert_eq!(projections.len(), 2);
1513
1514        let mut parser = Parser::new("dept, TIME_BUCKET(5 m)").unwrap();
1515        let group_by = parser.parse_group_by_list().unwrap();
1516        assert_eq!(group_by, vec!["dept", "TIME_BUCKET(5m)"]);
1517
1518        let mut parser = Parser::new("LOWER(name) DESC, created_at").unwrap();
1519        let order_by = parser.parse_order_by_list().unwrap();
1520        assert_eq!(order_by.len(), 2);
1521        assert!(order_by[0].expr.is_some());
1522        assert!(!order_by[0].ascending);
1523        assert!(order_by[0].nulls_first);
1524        assert!(order_by[1].ascending);
1525        assert!(!order_by[1].nulls_first);
1526
1527        let mut parser = Parser::new("-5 ms").unwrap();
1528        assert_eq!(parser.parse_function_literal_arg().unwrap(), "-5ms");
1529        let mut parser = Parser::new("2.0 H").unwrap();
1530        assert_eq!(parser.parse_function_literal_arg().unwrap(), "2h");
1531        let mut parser = Parser::new("bad").unwrap();
1532        assert!(parser.parse_function_literal_arg().is_err());
1533    }
1534
1535    #[test]
1536    fn from_subquery_source_is_preserved() {
1537        let parsed = super::super::parse("FROM (SELECT id FROM users) AS u RETURN u.id")
1538            .unwrap()
1539            .query;
1540        let QueryExpr::Table(table) = parsed else {
1541            panic!("expected table query");
1542        };
1543        assert_eq!(table.table, "__subq_u");
1544        assert_eq!(table.alias.as_deref(), Some("u"));
1545        assert!(matches!(table.source, Some(TableSource::Subquery(_))));
1546        assert_eq!(table.select_items.len(), 1);
1547
1548        assert!(super::super::parse("FROM (MATCH (n) RETURN n) AS g").is_err());
1549    }
1550
1551    // ── Table-valued function arguments (issues #795 / #796) ──
1552
1553    #[test]
1554    fn louvain_tvf_parses_positional_and_named_args() {
1555        // Bare positional form: louvain(<graph>).
1556        let table = parse_table("SELECT * FROM louvain(g)");
1557        match table.source {
1558            Some(TableSource::Function {
1559                ref name,
1560                ref args,
1561                ref named_args,
1562            }) => {
1563                assert_eq!(name, "louvain");
1564                assert_eq!(args, &vec!["g".to_string()]);
1565                assert!(named_args.is_empty());
1566            }
1567            other => panic!("expected louvain TVF source, got {other:?}"),
1568        }
1569
1570        // Named-argument form: louvain(<graph>, resolution => <f64>).
1571        let table = parse_table("SELECT * FROM louvain(g, resolution => 0.5)");
1572        match table.source {
1573            Some(TableSource::Function {
1574                ref name,
1575                ref args,
1576                ref named_args,
1577            }) => {
1578                assert_eq!(name, "louvain");
1579                assert_eq!(args, &vec!["g".to_string()]);
1580                assert_eq!(named_args.len(), 1);
1581                assert_eq!(named_args[0].0, "resolution");
1582                assert!((named_args[0].1 - 0.5).abs() < f64::EPSILON);
1583            }
1584            other => panic!("expected louvain TVF source, got {other:?}"),
1585        }
1586
1587        // Integer resolution is accepted and coerced to f64.
1588        let table = parse_table("SELECT * FROM louvain(g, resolution => 2)");
1589        match table.source {
1590            Some(TableSource::Function { ref named_args, .. }) => {
1591                assert!((named_args[0].1 - 2.0).abs() < f64::EPSILON);
1592            }
1593            other => panic!("expected louvain TVF source, got {other:?}"),
1594        }
1595    }
1596
1597    // ── Inline graph TVF: `nodes => / edges =>` subqueries (issue #799) ──
1598
1599    #[test]
1600    fn tvf_inline_form_parses_nodes_and_edges_subqueries() {
1601        // The inline form must produce a structurally distinct AST node
1602        // (InlineGraphFunction) from the graph-collection Function form.
1603        let table = parse_table(
1604            "SELECT * FROM components(nodes => (SELECT id FROM hosts), edges => (SELECT src, dst FROM links))",
1605        );
1606        match table.source {
1607            Some(TableSource::InlineGraphFunction {
1608                ref name,
1609                ref nodes,
1610                ref edges,
1611                ref named_args,
1612            }) => {
1613                assert_eq!(name, "components");
1614                assert!(named_args.is_empty());
1615                assert!(matches!(**nodes, QueryExpr::Table(_)));
1616                assert!(matches!(**edges, QueryExpr::Table(_)));
1617            }
1618            other => panic!("expected inline graph TVF source, got {other:?}"),
1619        }
1620    }
1621
1622    #[test]
1623    fn tvf_inline_form_carries_numeric_named_args() {
1624        // `resolution => <f64>` coexists with the inline subqueries.
1625        let table = parse_table(
1626            "SELECT * FROM louvain(nodes => (SELECT id FROM n), edges => (SELECT a, b FROM e), resolution => 0.5)",
1627        );
1628        match table.source {
1629            Some(TableSource::InlineGraphFunction {
1630                ref name,
1631                ref named_args,
1632                ..
1633            }) => {
1634                assert_eq!(name, "louvain");
1635                assert_eq!(named_args.len(), 1);
1636                assert_eq!(named_args[0].0, "resolution");
1637                assert!((named_args[0].1 - 0.5).abs() < f64::EPSILON);
1638            }
1639            other => panic!("expected inline graph TVF source, got {other:?}"),
1640        }
1641    }
1642
1643    #[test]
1644    fn tvf_inline_form_rejects_malformed_shapes() {
1645        // A positional graph argument cannot mix with inline subqueries.
1646        assert!(super::super::parse(
1647            "SELECT * FROM components(g, nodes => (SELECT id FROM n), edges => (SELECT a, b FROM e))"
1648        )
1649        .is_err());
1650        // The inline form requires both `nodes` and `edges`.
1651        assert!(
1652            super::super::parse("SELECT * FROM components(nodes => (SELECT id FROM n))").is_err()
1653        );
1654        assert!(
1655            super::super::parse("SELECT * FROM components(edges => (SELECT a, b FROM e))").is_err()
1656        );
1657        // An unknown subquery key is rejected.
1658        assert!(super::super::parse(
1659            "SELECT * FROM components(nodes => (SELECT id FROM n), verts => (SELECT a, b FROM e))"
1660        )
1661        .is_err());
1662        // A `=>` followed by a non-SELECT parenthesised group is rejected.
1663        assert!(super::super::parse(
1664            "SELECT * FROM components(nodes => (1 + 2), edges => (SELECT a, b FROM e))"
1665        )
1666        .is_err());
1667    }
1668
1669    #[test]
1670    fn shortest_path_tvf_parses_graph_ref_with_scalar_named_args() {
1671        // Required src/dst only.
1672        let table = parse_table("SELECT * FROM shortest_path(g, src => 1, dst => 4)");
1673        match table.source {
1674            Some(TableSource::Function {
1675                ref name,
1676                ref args,
1677                ref named_args,
1678            }) => {
1679                assert_eq!(name, "shortest_path");
1680                assert_eq!(args, &vec!["g".to_string()]);
1681                assert_eq!(named_args.len(), 2);
1682                assert_eq!(named_args[0].0, "src");
1683                assert!((named_args[0].1 - 1.0).abs() < f64::EPSILON);
1684                assert_eq!(named_args[1].0, "dst");
1685                assert!((named_args[1].1 - 4.0).abs() < f64::EPSILON);
1686            }
1687            other => panic!("expected shortest_path TVF source, got {other:?}"),
1688        }
1689
1690        // Optional max_hops named argument is accepted alongside src/dst.
1691        let table =
1692            parse_table("SELECT * FROM shortest_path(g, src => 1, dst => 4, max_hops => 3)");
1693        match table.source {
1694            Some(TableSource::Function { ref named_args, .. }) => {
1695                assert_eq!(named_args.len(), 3);
1696                assert_eq!(named_args[2].0, "max_hops");
1697                assert!((named_args[2].1 - 3.0).abs() < f64::EPSILON);
1698            }
1699            other => panic!("expected shortest_path TVF source, got {other:?}"),
1700        }
1701    }
1702
1703    #[test]
1704    fn centrality_tvfs_parse_positional_and_named_args() {
1705        // Bare positional form for each centrality TVF (issue #797). These flow
1706        // through the generic `ident(args)` path (not a dedicated keyword), so
1707        // the parser records them as a `TableSource::Function`.
1708        for name in ["betweenness", "eigenvector", "pagerank"] {
1709            let table = parse_table(&format!("SELECT * FROM {name}(g)"));
1710            match table.source {
1711                Some(TableSource::Function {
1712                    name: ref got,
1713                    ref args,
1714                    ref named_args,
1715                }) => {
1716                    assert_eq!(got, name);
1717                    assert_eq!(args, &vec!["g".to_string()]);
1718                    assert!(named_args.is_empty());
1719                }
1720                other => panic!("expected {name} TVF source, got {other:?}"),
1721            }
1722        }
1723
1724        // eigenvector(<graph>, max_iterations => <i64>, tolerance => <f64>).
1725        let table =
1726            parse_table("SELECT * FROM eigenvector(g, max_iterations => 50, tolerance => 0.0001)");
1727        match table.source {
1728            Some(TableSource::Function { ref named_args, .. }) => {
1729                assert_eq!(named_args.len(), 2);
1730                assert_eq!(named_args[0].0, "max_iterations");
1731                assert!((named_args[0].1 - 50.0).abs() < f64::EPSILON);
1732                assert_eq!(named_args[1].0, "tolerance");
1733                assert!((named_args[1].1 - 0.0001).abs() < f64::EPSILON);
1734            }
1735            other => panic!("expected eigenvector TVF source, got {other:?}"),
1736        }
1737
1738        // pagerank(<graph>, damping => <f64>, max_iterations => <i64>).
1739        let table =
1740            parse_table("SELECT * FROM pagerank(g, damping => 0.85, max_iterations => 100)");
1741        match table.source {
1742            Some(TableSource::Function {
1743                ref args,
1744                ref named_args,
1745                ..
1746            }) => {
1747                assert_eq!(args, &vec!["g".to_string()]);
1748                assert_eq!(named_args.len(), 2);
1749                assert_eq!(named_args[0].0, "damping");
1750                assert!((named_args[0].1 - 0.85).abs() < f64::EPSILON);
1751                assert_eq!(named_args[1].0, "max_iterations");
1752                assert!((named_args[1].1 - 100.0).abs() < f64::EPSILON);
1753            }
1754            other => panic!("expected pagerank TVF source, got {other:?}"),
1755        }
1756    }
1757
1758    #[test]
1759    fn tvf_named_arg_grammar_rejects_malformed_forms() {
1760        // A positional argument after a named argument is rejected.
1761        assert!(super::super::parse("SELECT * FROM louvain(g, resolution => 0.5, h)").is_err());
1762        // `=>` must be followed by a number.
1763        assert!(super::super::parse("SELECT * FROM louvain(g, resolution => foo)").is_err());
1764        // Zero-argument form is still rejected (issue #795 invariant).
1765        assert!(super::super::parse("SELECT * FROM louvain()").is_err());
1766    }
1767
1768    // ── SESSIONIZE operator (issue #585 slice 8) ──
1769
1770    #[test]
1771    fn test_parse_sessionize_full_clause() {
1772        let q = parse_table(
1773            "SELECT user_id, ts FROM events SESSIONIZE BY user_id GAP 30 m ORDER BY ts",
1774        );
1775        let s = q.sessionize.expect("sessionize present");
1776        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1777        assert_eq!(s.gap_ms, Some(30 * 60_000));
1778        assert_eq!(s.order_col.as_deref(), Some("ts"));
1779    }
1780
1781    #[test]
1782    fn test_parse_sessionize_omits_optional_order_by() {
1783        let q = parse_table("SELECT * FROM events SESSIONIZE BY user_id GAP 5 s");
1784        let s = q.sessionize.expect("sessionize present");
1785        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1786        assert_eq!(s.gap_ms, Some(5_000));
1787        assert!(s.order_col.is_none());
1788    }
1789
1790    #[test]
1791    fn test_parse_sessionize_bare_defers_to_descriptor() {
1792        // Both BY and GAP omitted — parser accepts the shape; the
1793        // executor raises MissingSessionKey when the descriptor
1794        // doesn't supply defaults.
1795        let q = parse_table("SELECT * FROM events SESSIONIZE");
1796        let s = q.sessionize.expect("sessionize present");
1797        assert!(s.actor_col.is_none());
1798        assert!(s.gap_ms.is_none());
1799        assert!(s.order_col.is_none());
1800    }
1801
1802    #[test]
1803    fn test_parse_sessionize_composes_with_where_and_limit() {
1804        let q = parse_table(
1805            "SELECT user_id FROM events \
1806             SESSIONIZE BY user_id GAP 1 m \
1807             WHERE user_id = 'u1' LIMIT 10",
1808        );
1809        let s = q.sessionize.expect("sessionize present");
1810        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1811        assert_eq!(s.gap_ms, Some(60_000));
1812        assert!(q.where_expr.is_some(), "WHERE still parsed");
1813        assert_eq!(q.limit, Some(10));
1814    }
1815
1816    #[test]
1817    fn test_parse_sessionize_absent_leaves_field_none() {
1818        let q = parse_table("SELECT * FROM events");
1819        assert!(q.sessionize.is_none());
1820    }
1821
1822    #[test]
1823    fn test_parse_sessionize_with_session_id_in_projection_e2e_shape() {
1824        // Matches the literal shape e2e tests use — session_id in the
1825        // projection list must not confuse the parser.
1826        let q = parse_table(
1827            "SELECT id, user_id, ts, session_id FROM events \
1828             SESSIONIZE BY user_id GAP 30 s ORDER BY ts",
1829        );
1830        let s = q.sessionize.expect("sessionize present");
1831        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1832        assert_eq!(s.gap_ms, Some(30_000));
1833    }
1834}