Skip to main content

reddb_server/storage/query/parser/
table.rs

1//! Table query parsing (SELECT ... FROM ...)
2
3use super::super::ast::{
4    BinOp, CompareOp, Expr, FieldRef, Filter, OrderByClause, Projection, QueryExpr,
5    QueueSelectQuery, SelectItem, Span, TableQuery, UnaryOp,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use crate::storage::query::sql_lowering::{
10    expr_to_projection, filter_to_expr, select_item_to_projection,
11};
12use crate::storage::schema::Value;
13
14fn is_scalar_function(name: &str) -> bool {
15    matches!(
16        name,
17        "GEO_DISTANCE"
18            | "GEO_DISTANCE_VINCENTY"
19            | "GEO_BEARING"
20            | "GEO_MIDPOINT"
21            | "HAVERSINE"
22            | "VINCENTY"
23            | "TIME_BUCKET"
24            | "UPPER"
25            | "LOWER"
26            | "LENGTH"
27            | "CHAR_LENGTH"
28            | "CHARACTER_LENGTH"
29            | "OCTET_LENGTH"
30            | "BIT_LENGTH"
31            | "SUBSTRING"
32            | "SUBSTR"
33            | "POSITION"
34            | "TRIM"
35            | "LTRIM"
36            | "RTRIM"
37            | "BTRIM"
38            | "CONCAT"
39            | "CONCAT_WS"
40            | "REVERSE"
41            | "LEFT"
42            | "RIGHT"
43            | "QUOTE_LITERAL"
44            | "ABS"
45            | "ROUND"
46            | "COALESCE"
47            | "STDDEV"
48            | "VARIANCE"
49            | "MEDIAN"
50            | "PERCENTILE"
51            | "GROUP_CONCAT"
52            | "STRING_AGG"
53            | "FIRST"
54            | "LAST"
55            | "ARRAY_AGG"
56            | "COUNT_DISTINCT"
57            | "MONEY"
58            | "MONEY_ASSET"
59            | "MONEY_MINOR"
60            | "MONEY_SCALE"
61            | "VERIFY_PASSWORD"
62            | "CAST"
63            | "CASE"
64    )
65}
66
67fn is_aggregate_function(name: &str) -> bool {
68    matches!(
69        name,
70        "COUNT"
71            | "AVG"
72            | "SUM"
73            | "MIN"
74            | "MAX"
75            | "STDDEV"
76            | "VARIANCE"
77            | "MEDIAN"
78            | "PERCENTILE"
79            | "GROUP_CONCAT"
80            | "STRING_AGG"
81            | "FIRST"
82            | "LAST"
83            | "ARRAY_AGG"
84            | "COUNT_DISTINCT"
85    )
86}
87
88fn aggregate_token_name(token: &Token) -> Option<&'static str> {
89    match token {
90        Token::Count => Some("COUNT"),
91        Token::Sum => Some("SUM"),
92        Token::Avg => Some("AVG"),
93        Token::Min => Some("MIN"),
94        Token::Max => Some("MAX"),
95        Token::First => Some("FIRST"),
96        Token::Last => Some("LAST"),
97        _ => None,
98    }
99}
100
101fn scalar_token_name(token: &Token) -> Option<&'static str> {
102    match token {
103        Token::Left => Some("LEFT"),
104        Token::Right => Some("RIGHT"),
105        _ => None,
106    }
107}
108use super::Parser;
109
110impl<'a> Parser<'a> {
111    /// Parse SELECT ... FROM ... query
112    pub fn parse_select_query(&mut self) -> Result<QueryExpr, ParseError> {
113        // Recursion guard: nested subqueries (UNION, derived tables,
114        // EXISTS) re-enter through this point, so depth here bounds
115        // the SELECT-shaped recursion in addition to the expr Pratt
116        // climb guarded in `parse_expr_prec`.
117        self.enter_depth()?;
118        let result = self.parse_select_query_inner();
119        self.exit_depth();
120        result
121    }
122
123    fn parse_select_query_inner(&mut self) -> Result<QueryExpr, ParseError> {
124        self.expect(Token::Select)?;
125
126        // Parse column list
127        let (select_items, columns) = self.parse_select_items_and_projections()?;
128
129        // Parse optional table source. If omitted, default to `ANY` so the query
130        // can return mixed entities (table, document, graph, and vector) by default.
131        let has_from = self.consume(&Token::From)?;
132        let table = if has_from {
133            if self.consume(&Token::Queue)? {
134                let queue = self.expect_ident()?;
135                let filter = if self.consume(&Token::Where)? {
136                    Some(self.parse_filter()?)
137                } else {
138                    None
139                };
140                let limit = if self.consume(&Token::Limit)? {
141                    Some(self.parse_integer()? as u64)
142                } else {
143                    None
144                };
145                return Ok(QueryExpr::QueueSelect(QueueSelectQuery {
146                    queue,
147                    columns: queue_projection_columns(&columns)?,
148                    filter,
149                    limit,
150                }));
151            } else if self.consume(&Token::Star)? {
152                "*".to_string()
153            } else if self.consume(&Token::All)? {
154                "all".to_string()
155            } else {
156                self.expect_ident()?
157            }
158        } else {
159            "any".to_string()
160        };
161
162        // Parse optional alias (only when a FROM clause exists).
163        // `AS OF` is a clause — don't gobble the `AS` as an alias
164        // marker when the following token is `OF`.
165        let alias =
166            if !has_from || (self.check(&Token::As) && matches!(self.peek_next()?, Token::Of)) {
167                None
168            } else if self.consume(&Token::As)?
169                || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
170            {
171                Some(self.expect_ident()?)
172            } else {
173                None
174            };
175
176        let mut query = TableQuery {
177            table,
178            source: None,
179            alias,
180            select_items,
181            columns,
182            where_expr: None,
183            filter: None,
184            group_by_exprs: Vec::new(),
185            group_by: Vec::new(),
186            having_expr: None,
187            having: None,
188            order_by: Vec::new(),
189            limit: None,
190            limit_param: None,
191            offset: None,
192            offset_param: None,
193            expand: None,
194            as_of: None,
195            sessionize: None,
196        };
197
198        if self.is_join_keyword() {
199            let return_items = std::mem::take(&mut query.select_items);
200            let return_ = std::mem::take(&mut query.columns);
201            let mut expr = self.parse_join_query(QueryExpr::Table(query))?;
202            if let QueryExpr::Join(join) = &mut expr {
203                join.return_items = return_items;
204                join.return_ = return_;
205            }
206            return Ok(expr);
207        }
208
209        // SESSIONIZE BY <ident> GAP <duration> [ORDER BY <ident>]
210        // — issue #585 slice 8. Parsed before WHERE/GROUP BY so the
211        // optional inner ORDER BY (which the user binds to the
212        // operator's timestamp axis) cannot be confused with the
213        // SELECT's top-level ORDER BY further down. Both `BY` and
214        // `GAP` may be omitted when the source collection's
215        // descriptor carries `SESSION_KEY` / `SESSION_GAP` defaults
216        // (slice 1) — the executor resolves them at run time and
217        // raises `MissingSessionKey` if neither side supplies a
218        // value.
219        if self.consume(&Token::Sessionize)? {
220            query.sessionize = Some(self.parse_sessionize_clause()?);
221        }
222
223        // Parse optional clauses
224        self.parse_table_clauses(&mut query)?;
225
226        Ok(QueryExpr::Table(query))
227    }
228
229    fn parse_sessionize_clause(
230        &mut self,
231    ) -> Result<crate::storage::query::ast::SessionizeClause, ParseError> {
232        use crate::storage::query::ast::SessionizeClause;
233
234        let mut clause = SessionizeClause::default();
235
236        if self.consume(&Token::By)? {
237            clause.actor_col = Some(self.expect_ident()?);
238        }
239        if self.consume(&Token::Gap)? {
240            let value = self.parse_float()?;
241            let unit = self.parse_duration_unit()?;
242            clause.gap_ms = Some((value * unit) as u64);
243        }
244        // Optional `ORDER BY <ident>` immediately after GAP. The
245        // top-level SELECT ORDER BY parsed by `parse_table_clauses`
246        // sees the next ORDER token, so this only consumes the one
247        // immediately attached to SESSIONIZE.
248        if self.consume(&Token::Order)? {
249            self.expect(Token::By)?;
250            clause.order_col = Some(self.expect_ident()?);
251        }
252        Ok(clause)
253    }
254}
255
256impl<'a> Parser<'a> {
257    /// Check if current identifier is a clause keyword
258    pub fn is_clause_keyword(&self) -> bool {
259        matches!(
260            self.peek(),
261            Token::Where
262                | Token::Order
263                | Token::Limit
264                | Token::Offset
265                | Token::Join
266                | Token::Inner
267                | Token::Left
268                | Token::Right
269                | Token::As
270                | Token::Sessionize
271        )
272    }
273
274    /// Parse projection list (column selections)
275    pub fn parse_projection_list(&mut self) -> Result<Vec<Projection>, ParseError> {
276        Ok(self.parse_select_items_and_projections()?.1)
277    }
278
279    pub(crate) fn parse_select_items_and_projections(
280        &mut self,
281    ) -> Result<(Vec<SelectItem>, Vec<Projection>), ParseError> {
282        // Handle SELECT *
283        if self.consume(&Token::Star)? {
284            return Ok((vec![SelectItem::Wildcard], Vec::new())); // Empty legacy vec means all columns
285        }
286
287        let mut select_items = Vec::new();
288        let mut projections = Vec::new();
289        loop {
290            let (item, proj) = self.parse_projection()?;
291            select_items.push(item);
292            projections.push(proj);
293
294            if !self.consume(&Token::Comma)? {
295                break;
296            }
297        }
298        Ok((select_items, projections))
299    }
300
301    /// Parse a single projection — supports columns, aggregate functions, and scalar functions
302    fn parse_projection(&mut self) -> Result<(SelectItem, Projection), ParseError> {
303        let expr = self.parse_expr()?;
304        if contains_nested_aggregate(&expr) && !is_plain_aggregate_expr(&expr) {
305            return Err(ParseError::new(
306                "aggregate function is not valid inside another expression".to_string(),
307                self.position(),
308            ));
309        }
310        let alias = if self.consume(&Token::As)? {
311            Some(self.expect_column_ident()?)
312        } else {
313            None
314        };
315        let select_item = SelectItem::Expr {
316            expr: expr.clone(),
317            alias: alias.clone(),
318        };
319        let projection = select_item_to_projection(&select_item).ok_or_else(|| {
320            ParseError::new(
321                "projection cannot yet be lowered to legacy runtime representation".to_string(),
322                self.position(),
323            )
324        })?;
325        Ok((select_item, projection))
326    }
327}
328
329fn contains_nested_aggregate(expr: &Expr) -> bool {
330    match expr {
331        Expr::FunctionCall { name, args, .. } => {
332            is_aggregate_function(&name.to_uppercase())
333                || args.iter().any(contains_nested_aggregate)
334        }
335        // Issue #589 slice 7a: a window function aggregate (e.g.
336        // `SUM(x) OVER (...)`) is NOT a plain aggregate from the
337        // group-by analyser's point of view — it operates over a
338        // partitioned window, not a GROUP BY group. We still recurse
339        // into args / partition / order keys so a *nested* aggregate
340        // (e.g. `SUM(COUNT(*) OVER ()) OVER (...)`) is caught.
341        Expr::WindowFunctionCall { args, window, .. } => {
342            args.iter().any(contains_nested_aggregate)
343                || window.partition_by.iter().any(contains_nested_aggregate)
344                || window
345                    .order_by
346                    .iter()
347                    .any(|o| contains_nested_aggregate(&o.expr))
348        }
349        Expr::BinaryOp { lhs, rhs, .. } => {
350            contains_nested_aggregate(lhs) || contains_nested_aggregate(rhs)
351        }
352        Expr::UnaryOp { operand, .. } | Expr::IsNull { operand, .. } => {
353            contains_nested_aggregate(operand)
354        }
355        Expr::Cast { inner, .. } => contains_nested_aggregate(inner),
356        Expr::Case {
357            branches, else_, ..
358        } => {
359            branches.iter().any(|(cond, value)| {
360                contains_nested_aggregate(cond) || contains_nested_aggregate(value)
361            }) || else_.as_deref().is_some_and(contains_nested_aggregate)
362        }
363        Expr::InList { target, values, .. } => {
364            contains_nested_aggregate(target) || values.iter().any(contains_nested_aggregate)
365        }
366        Expr::Between {
367            target, low, high, ..
368        } => {
369            contains_nested_aggregate(target)
370                || contains_nested_aggregate(low)
371                || contains_nested_aggregate(high)
372        }
373        Expr::Literal { .. }
374        | Expr::Column { .. }
375        | Expr::Parameter { .. }
376        | Expr::Subquery { .. } => false,
377    }
378}
379
380fn is_plain_aggregate_expr(expr: &Expr) -> bool {
381    match expr {
382        Expr::FunctionCall { name, args, .. } if is_aggregate_function(&name.to_uppercase()) => {
383            !args.iter().any(contains_nested_aggregate)
384        }
385        _ => false,
386    }
387}
388
389fn attach_projection_alias(proj: Projection, alias: Option<String>) -> Projection {
390    let Some(alias) = alias else { return proj };
391    match proj {
392        Projection::Field(field, _) => Projection::Field(field, Some(alias)),
393        Projection::Expression(filter, _) => Projection::Expression(filter, Some(alias)),
394        Projection::Function(name, args) => {
395            if name.contains(':') {
396                Projection::Function(name, args)
397            } else {
398                Projection::Function(format!("{name}:{alias}"), args)
399            }
400        }
401        Projection::Column(column) => Projection::Alias(column, alias),
402        Projection::Window {
403            name, args, window, ..
404        } => Projection::Window {
405            name,
406            args,
407            window,
408            alias: Some(alias),
409        },
410        other => other,
411    }
412}
413
414fn queue_projection_columns(columns: &[Projection]) -> Result<Vec<String>, ParseError> {
415    let mut out = Vec::new();
416    for column in columns {
417        match column {
418            Projection::Column(name) => out.push(name.clone()),
419            Projection::Alias(name, _) => out.push(name.clone()),
420            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
421                out.push(column.clone());
422            }
423            Projection::All => return Ok(Vec::new()),
424            other => {
425                return Err(ParseError::new(
426                    format!(
427                        "unsupported SELECT FROM QUEUE projection {other:?}; use `SELECT *` or bare column names, or use queue verbs (PUSH, POP, PEEK, LEN, ACK, NACK, …) for queue operations"
428                    ),
429                    crate::storage::query::lexer::Position::default(),
430                ));
431            }
432        }
433    }
434    Ok(out)
435}
436
437impl<'a> Parser<'a> {
438    /// Parse table query clauses (AS OF, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET)
439    pub fn parse_table_clauses(&mut self, query: &mut TableQuery) -> Result<(), ParseError> {
440        // AS OF clause — time-travel anchor. Must come before WHERE
441        // so the executor can bind the snapshot before filter eval.
442        if self.check(&Token::As) {
443            let next_is_of = matches!(self.peek_next()?, Token::Of);
444            if next_is_of {
445                self.expect(Token::As)?;
446                self.expect(Token::Of)?;
447                query.as_of = Some(self.parse_as_of_spec()?);
448            }
449        }
450
451        // WHERE clause
452        if self.consume(&Token::Where)? {
453            let filter = self.parse_filter()?;
454            query.where_expr = Some(filter_to_expr(&filter));
455            query.filter = Some(filter);
456        }
457
458        // GROUP BY clause
459        if self.consume(&Token::Group)? {
460            self.expect(Token::By)?;
461            let (group_by_exprs, group_by) = self.parse_group_by_items()?;
462            query.group_by_exprs = group_by_exprs;
463            query.group_by = group_by;
464        }
465
466        // HAVING clause (only valid after GROUP BY)
467        if !query.group_by_exprs.is_empty() && self.consume_ident_ci("HAVING")? {
468            let having = self.parse_filter()?;
469            query.having_expr = Some(filter_to_expr(&having));
470            query.having = Some(having);
471        }
472
473        // ORDER BY clause
474        if self.consume(&Token::Order)? {
475            self.expect(Token::By)?;
476            query.order_by = self.parse_order_by_list()?;
477        }
478
479        // LIMIT clause
480        if self.consume(&Token::Limit)? {
481            if matches!(self.peek(), Token::Dollar | Token::Question) {
482                query.limit_param = Some(self.parse_param_slot("LIMIT")?);
483                query.limit = None;
484            } else {
485                query.limit = Some(self.parse_integer()? as u64);
486            }
487        }
488
489        // OFFSET clause
490        if self.consume(&Token::Offset)? {
491            if matches!(self.peek(), Token::Dollar | Token::Question) {
492                query.offset_param = Some(self.parse_param_slot("OFFSET")?);
493                query.offset = None;
494            } else {
495                query.offset = Some(self.parse_integer()? as u64);
496            }
497        }
498
499        // WITH EXPAND clause
500        if self.consume(&Token::With)? && self.consume_ident_ci("EXPAND")? {
501            query.expand = Some(self.parse_expand_options()?);
502        }
503
504        Ok(())
505    }
506
507    /// Parse an AS OF spec after `AS OF` has already been consumed.
508    /// Grammar:
509    ///   AS OF COMMIT   '<hex>'
510    ///   AS OF BRANCH   '<name>'
511    ///   AS OF TAG      '<name>'
512    ///   AS OF TIMESTAMP <integer-ms>
513    ///   AS OF SNAPSHOT  <xid>
514    fn parse_as_of_spec(&mut self) -> Result<crate::storage::query::ast::AsOfClause, ParseError> {
515        use crate::storage::query::ast::AsOfClause;
516
517        // Keyword — accept both tokenized forms (e.g. Token::Commit
518        // if present) and bare identifiers for flexibility.
519        let keyword = match self.peek() {
520            Token::Ident(s) => {
521                let s = s.to_ascii_uppercase();
522                self.advance()?;
523                s
524            }
525            Token::Commit => {
526                self.advance()?;
527                "COMMIT".to_string()
528            }
529            other => {
530                return Err(ParseError::expected(
531                    vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
532                    other,
533                    self.position(),
534                ));
535            }
536        };
537
538        match keyword.as_str() {
539            "COMMIT" => {
540                let value = self.parse_string()?;
541                Ok(AsOfClause::Commit(value))
542            }
543            "BRANCH" => {
544                let value = self.parse_string()?;
545                Ok(AsOfClause::Branch(value))
546            }
547            "TAG" => {
548                let value = self.parse_string()?;
549                Ok(AsOfClause::Tag(value))
550            }
551            "TIMESTAMP" => {
552                let value = self.parse_integer()?;
553                Ok(AsOfClause::TimestampMs(value))
554            }
555            "SNAPSHOT" => {
556                let value = self.parse_integer()?;
557                if value < 0 {
558                    return Err(ParseError::new(
559                        "AS OF SNAPSHOT requires non-negative xid".to_string(),
560                        self.position(),
561                    ));
562                }
563                Ok(AsOfClause::Snapshot(value as u64))
564            }
565            other => Err(ParseError::expected(
566                vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
567                &Token::Ident(other.into()),
568                self.position(),
569            )),
570        }
571    }
572
573    /// Parse EXPAND options: GRAPH [DEPTH n], CROSS_REFS, ALL
574    fn parse_expand_options(
575        &mut self,
576    ) -> Result<crate::storage::query::ast::ExpandOptions, ParseError> {
577        use crate::storage::query::ast::ExpandOptions;
578        let mut opts = ExpandOptions::default();
579
580        loop {
581            if self.consume(&Token::Graph)? || self.consume_ident_ci("GRAPH")? {
582                opts.graph = true;
583                opts.graph_depth = if self.consume(&Token::Depth)? {
584                    self.parse_integer()? as usize
585                } else {
586                    1
587                };
588            } else if self.consume_ident_ci("CROSS_REFS")?
589                || self.consume_ident_ci("CROSSREFS")?
590                || self.consume_ident_ci("REFS")?
591            {
592                opts.cross_refs = true;
593            } else if self.consume(&Token::All)? || self.consume_ident_ci("ALL")? {
594                opts.graph = true;
595                opts.cross_refs = true;
596                opts.graph_depth = 1;
597            } else {
598                break;
599            }
600            if !self.consume(&Token::Comma)? {
601                break;
602            }
603        }
604
605        if !opts.graph && !opts.cross_refs {
606            opts.graph = true;
607            opts.cross_refs = true;
608            opts.graph_depth = 1;
609        }
610
611        Ok(opts)
612    }
613
614    /// Parse GROUP BY field list
615    pub fn parse_group_by_list(&mut self) -> Result<Vec<String>, ParseError> {
616        Ok(self.parse_group_by_items()?.1)
617    }
618
619    fn parse_group_by_items(&mut self) -> Result<(Vec<Expr>, Vec<String>), ParseError> {
620        let mut exprs = Vec::new();
621        let mut fields = Vec::new();
622        loop {
623            let expr = self.parse_expr()?;
624            let rendered = render_group_by_expr(&expr).ok_or_else(|| {
625                ParseError::new(
626                    "GROUP BY expression cannot yet be lowered to legacy runtime representation"
627                        .to_string(),
628                    self.position(),
629                )
630            })?;
631            exprs.push(expr);
632            fields.push(rendered);
633            if !self.consume(&Token::Comma)? {
634                break;
635            }
636        }
637        Ok((exprs, fields))
638    }
639
640    /// Parse ORDER BY list.
641    ///
642    /// Fase 1.6 unlock: uses the new `Expr` Pratt parser so
643    /// `ORDER BY CAST(age AS INT)`, `ORDER BY a + b * 2`,
644    /// `ORDER BY last_seen - created_at` all parse cleanly. If the
645    /// parsed expression is a bare `Column`, we store it in the
646    /// legacy `field` slot and leave `expr` None so downstream
647    /// consumers (planner cost, mode translators) keep using the
648    /// fast path. Otherwise we stash the full tree in `expr` and
649    /// populate `field` with a synthetic marker that runtime code
650    /// never touches.
651    pub fn parse_order_by_list(&mut self) -> Result<Vec<OrderByClause>, ParseError> {
652        use super::super::ast::Expr as AstExpr;
653        let mut clauses = Vec::new();
654        loop {
655            let parsed = self.parse_expr()?;
656            let (field, expr_slot) = match parsed {
657                AstExpr::Column { field, .. } => (field, None),
658                other => (
659                    // Synthetic placeholder so legacy pattern-matches
660                    // on `OrderByClause.field` still destructure.
661                    // Runtime comparators check `expr` first when set,
662                    // so the sentinel never gets resolved against a
663                    // real record.
664                    FieldRef::TableColumn {
665                        table: String::new(),
666                        column: String::new(),
667                    },
668                    Some(other),
669                ),
670            };
671
672            let ascending = if self.consume(&Token::Desc)? {
673                false
674            } else {
675                self.consume(&Token::Asc)?;
676                true
677            };
678
679            let nulls_first = if self.consume(&Token::Nulls)? {
680                if self.consume(&Token::First)? {
681                    true
682                } else {
683                    self.expect(Token::Last)?;
684                    false
685                }
686            } else {
687                !ascending // Default: nulls last for ASC, first for DESC
688            };
689
690            clauses.push(OrderByClause {
691                field,
692                expr: expr_slot,
693                ascending,
694                nulls_first,
695            });
696
697            if !self.consume(&Token::Comma)? {
698                break;
699            }
700        }
701        Ok(clauses)
702    }
703
704    fn parse_function_literal_arg(&mut self) -> Result<String, ParseError> {
705        let negative = self.consume(&Token::Dash)?;
706        let mut literal = match self.advance()? {
707            Token::Integer(n) => {
708                if negative {
709                    format!("-{n}")
710                } else {
711                    n.to_string()
712                }
713            }
714            Token::Float(n) => {
715                let value = if negative { -n } else { n };
716                if value.fract().abs() < f64::EPSILON {
717                    format!("{}", value as i64)
718                } else {
719                    value.to_string()
720                }
721            }
722            other => {
723                return Err(ParseError::new(
724                    // F-05: `other` is a `Token` whose Display arms emit raw
725                    // user bytes for `Ident` / `String` / `JsonLiteral`.
726                    // Render via `{:?}` so CR/LF/NUL/quotes are escaped
727                    // before the message reaches downstream serialization
728                    // sinks.
729                    format!("expected number, got {:?}", other),
730                    self.position(),
731                ));
732            }
733        };
734
735        if let Token::Ident(unit) = self.peek().clone() {
736            if is_duration_unit(&unit) {
737                self.advance()?;
738                literal.push_str(&unit.to_ascii_lowercase());
739            }
740        }
741
742        Ok(literal)
743    }
744}
745
746fn is_duration_unit(unit: &str) -> bool {
747    matches!(
748        unit.to_ascii_lowercase().as_str(),
749        "ms" | "msec"
750            | "millisecond"
751            | "milliseconds"
752            | "s"
753            | "sec"
754            | "secs"
755            | "second"
756            | "seconds"
757            | "m"
758            | "min"
759            | "mins"
760            | "minute"
761            | "minutes"
762            | "h"
763            | "hr"
764            | "hrs"
765            | "hour"
766            | "hours"
767            | "d"
768            | "day"
769            | "days"
770    )
771}
772
773fn render_group_by_expr(expr: &Expr) -> Option<String> {
774    match expr {
775        Expr::Column { field, .. } => match field {
776            FieldRef::TableColumn { table, column } if table.is_empty() => Some(column.clone()),
777            FieldRef::TableColumn { table, column } => Some(format!("{table}.{column}")),
778            other => Some(format!("{other:?}")),
779        },
780        Expr::FunctionCall { name, args, .. } if name.eq_ignore_ascii_case("TIME_BUCKET") => {
781            let rendered = args
782                .iter()
783                .map(render_group_by_expr)
784                .collect::<Option<Vec<_>>>()?;
785            Some(format!("TIME_BUCKET({})", rendered.join(",")))
786        }
787        Expr::Literal { value, .. } => Some(match value {
788            Value::Null => String::new(),
789            Value::Text(text) => text.to_string(),
790            other => other.to_string(),
791        }),
792        _ => expr_to_projection(expr).map(|projection| match projection {
793            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
794                column
795            }
796            Projection::Field(FieldRef::TableColumn { table, column }, _) => {
797                format!("{table}.{column}")
798            }
799            Projection::Function(name, args) => {
800                let rendered = args
801                    .iter()
802                    .map(render_group_by_function_arg)
803                    .collect::<Option<Vec<_>>>()
804                    .unwrap_or_default();
805                format!(
806                    "{}({})",
807                    name.split(':').next().unwrap_or(&name),
808                    rendered.join(",")
809                )
810            }
811            Projection::Column(column) | Projection::Alias(column, _) => column,
812            Projection::All => "*".to_string(),
813            Projection::Expression(_, _) => "expr".to_string(),
814            Projection::Field(other, _) => format!("{other:?}"),
815            Projection::Window { name, .. } => name,
816        }),
817    }
818}
819
820fn render_group_by_function_arg(arg: &Projection) -> Option<String> {
821    match arg {
822        Projection::Column(col) => Some(
823            col.strip_prefix("LIT:")
824                .map(str::to_string)
825                .unwrap_or_else(|| col.clone()),
826        ),
827        Projection::All => Some("*".to_string()),
828        _ => None,
829    }
830}
831
832#[cfg(test)]
833mod tests {
834    use super::*;
835    use crate::storage::query::ast::{AsOfClause, BinOp, CompareOp, ExpandOptions, TableSource};
836
837    fn parse_table(sql: &str) -> TableQuery {
838        let parsed = super::super::parse(sql).unwrap().query;
839        let QueryExpr::Table(table) = parsed else {
840            panic!("expected table query");
841        };
842        table
843    }
844
845    fn col(name: &str) -> Expr {
846        Expr::Column {
847            field: FieldRef::TableColumn {
848                table: String::new(),
849                column: name.to_string(),
850            },
851            span: Span::synthetic(),
852        }
853    }
854
855    #[test]
856    fn helper_function_catalogs_cover_all_names() {
857        for name in [
858            "GEO_DISTANCE",
859            "GEO_DISTANCE_VINCENTY",
860            "GEO_BEARING",
861            "GEO_MIDPOINT",
862            "HAVERSINE",
863            "VINCENTY",
864            "TIME_BUCKET",
865            "UPPER",
866            "LOWER",
867            "LENGTH",
868            "CHAR_LENGTH",
869            "CHARACTER_LENGTH",
870            "OCTET_LENGTH",
871            "BIT_LENGTH",
872            "SUBSTRING",
873            "SUBSTR",
874            "POSITION",
875            "TRIM",
876            "LTRIM",
877            "RTRIM",
878            "BTRIM",
879            "CONCAT",
880            "CONCAT_WS",
881            "REVERSE",
882            "LEFT",
883            "RIGHT",
884            "QUOTE_LITERAL",
885            "ABS",
886            "ROUND",
887            "COALESCE",
888            "STDDEV",
889            "VARIANCE",
890            "MEDIAN",
891            "PERCENTILE",
892            "GROUP_CONCAT",
893            "STRING_AGG",
894            "FIRST",
895            "LAST",
896            "ARRAY_AGG",
897            "COUNT_DISTINCT",
898            "MONEY",
899            "MONEY_ASSET",
900            "MONEY_MINOR",
901            "MONEY_SCALE",
902            "VERIFY_PASSWORD",
903            "CAST",
904            "CASE",
905        ] {
906            assert!(is_scalar_function(name), "{name}");
907        }
908        assert!(!is_scalar_function("NOT_A_FUNCTION"));
909
910        for name in [
911            "COUNT",
912            "AVG",
913            "SUM",
914            "MIN",
915            "MAX",
916            "STDDEV",
917            "VARIANCE",
918            "MEDIAN",
919            "PERCENTILE",
920            "GROUP_CONCAT",
921            "STRING_AGG",
922            "FIRST",
923            "LAST",
924            "ARRAY_AGG",
925            "COUNT_DISTINCT",
926        ] {
927            assert!(is_aggregate_function(name), "{name}");
928        }
929        assert!(!is_aggregate_function("LOWER"));
930
931        assert_eq!(aggregate_token_name(&Token::Count), Some("COUNT"));
932        assert_eq!(aggregate_token_name(&Token::Sum), Some("SUM"));
933        assert_eq!(aggregate_token_name(&Token::Avg), Some("AVG"));
934        assert_eq!(aggregate_token_name(&Token::Min), Some("MIN"));
935        assert_eq!(aggregate_token_name(&Token::Max), Some("MAX"));
936        assert_eq!(aggregate_token_name(&Token::First), Some("FIRST"));
937        assert_eq!(aggregate_token_name(&Token::Last), Some("LAST"));
938        assert_eq!(aggregate_token_name(&Token::Ident("COUNT".into())), None);
939
940        assert_eq!(scalar_token_name(&Token::Left), Some("LEFT"));
941        assert_eq!(scalar_token_name(&Token::Right), Some("RIGHT"));
942        assert_eq!(scalar_token_name(&Token::Ident("LEFT".into())), None);
943
944        for unit in [
945            "ms",
946            "msec",
947            "millisecond",
948            "milliseconds",
949            "s",
950            "sec",
951            "secs",
952            "second",
953            "seconds",
954            "m",
955            "min",
956            "mins",
957            "minute",
958            "minutes",
959            "h",
960            "hr",
961            "hrs",
962            "hour",
963            "hours",
964            "d",
965            "day",
966            "days",
967        ] {
968            assert!(is_duration_unit(unit), "{unit}");
969        }
970        assert!(!is_duration_unit("fortnight"));
971    }
972
973    #[test]
974    fn projection_and_group_render_helpers_cover_aliases_and_exprs() {
975        let field = FieldRef::TableColumn {
976            table: String::new(),
977            column: "name".into(),
978        };
979        let filter = Filter::Compare {
980            field: field.clone(),
981            op: CompareOp::Eq,
982            value: Value::text("alice"),
983        };
984
985        assert_eq!(
986            attach_projection_alias(Projection::Field(field.clone(), None), Some("n".into())),
987            Projection::Field(field.clone(), Some("n".into()))
988        );
989        assert_eq!(
990            attach_projection_alias(
991                Projection::Expression(Box::new(filter.clone()), None),
992                Some("ok".into())
993            ),
994            Projection::Expression(Box::new(filter), Some("ok".into()))
995        );
996        assert_eq!(
997            attach_projection_alias(
998                Projection::Function("LOWER".into(), vec![]),
999                Some("l".into())
1000            ),
1001            Projection::Function("LOWER:l".into(), vec![])
1002        );
1003        assert_eq!(
1004            attach_projection_alias(
1005                Projection::Function("LOWER:l".into(), vec![]),
1006                Some("ignored".into())
1007            ),
1008            Projection::Function("LOWER:l".into(), vec![])
1009        );
1010        assert_eq!(
1011            attach_projection_alias(Projection::Column("name".into()), Some("n".into())),
1012            Projection::Alias("name".into(), "n".into())
1013        );
1014        assert_eq!(
1015            attach_projection_alias(Projection::All, Some("ignored".into())),
1016            Projection::All
1017        );
1018
1019        assert_eq!(render_group_by_expr(&col("dept")).as_deref(), Some("dept"));
1020        assert_eq!(
1021            render_group_by_expr(&Expr::Column {
1022                field: FieldRef::TableColumn {
1023                    table: "employees".into(),
1024                    column: "dept".into()
1025                },
1026                span: Span::synthetic()
1027            })
1028            .as_deref(),
1029            Some("employees.dept")
1030        );
1031        assert_eq!(
1032            render_group_by_expr(&Expr::Column {
1033                field: FieldRef::NodeId { alias: "n".into() },
1034                span: Span::synthetic()
1035            }),
1036            Some("NodeId { alias: \"n\" }".into())
1037        );
1038        assert_eq!(
1039            render_group_by_expr(&Expr::Literal {
1040                value: Value::Null,
1041                span: Span::synthetic()
1042            })
1043            .as_deref(),
1044            Some("")
1045        );
1046        assert_eq!(
1047            render_group_by_expr(&Expr::Literal {
1048                value: Value::text("5m"),
1049                span: Span::synthetic()
1050            })
1051            .as_deref(),
1052            Some("5m")
1053        );
1054        assert_eq!(
1055            render_group_by_expr(&Expr::Literal {
1056                value: Value::Integer(7),
1057                span: Span::synthetic()
1058            })
1059            .as_deref(),
1060            Some("7")
1061        );
1062        assert_eq!(
1063            render_group_by_expr(&Expr::FunctionCall {
1064                name: "TIME_BUCKET".into(),
1065                args: vec![
1066                    col("ts"),
1067                    Expr::Literal {
1068                        value: Value::text("5m"),
1069                        span: Span::synthetic()
1070                    }
1071                ],
1072                span: Span::synthetic()
1073            })
1074            .as_deref(),
1075            Some("TIME_BUCKET(ts,5m)")
1076        );
1077        assert_eq!(
1078            render_group_by_expr(&Expr::FunctionCall {
1079                name: "LOWER".into(),
1080                args: vec![col("dept")],
1081                span: Span::synthetic()
1082            })
1083            .as_deref(),
1084            Some("LOWER()")
1085        );
1086
1087        assert_eq!(
1088            render_group_by_function_arg(&Projection::Column("LIT:5m".into())),
1089            Some("5m".into())
1090        );
1091        assert_eq!(
1092            render_group_by_function_arg(&Projection::Column("dept".into())),
1093            Some("dept".into())
1094        );
1095        assert_eq!(
1096            render_group_by_function_arg(&Projection::All),
1097            Some("*".into())
1098        );
1099        assert_eq!(
1100            render_group_by_function_arg(&Projection::Function("LOWER".into(), vec![])),
1101            None
1102        );
1103    }
1104
1105    #[test]
1106    fn expression_aggregate_detection_branches() {
1107        let count = Expr::FunctionCall {
1108            name: "COUNT".into(),
1109            args: vec![col("id")],
1110            span: Span::synthetic(),
1111        };
1112        assert!(contains_nested_aggregate(&count));
1113        assert!(is_plain_aggregate_expr(&count));
1114
1115        let nested = Expr::FunctionCall {
1116            name: "SUM".into(),
1117            args: vec![count.clone()],
1118            span: Span::synthetic(),
1119        };
1120        assert!(contains_nested_aggregate(&nested));
1121        assert!(!is_plain_aggregate_expr(&nested));
1122
1123        let binary = Expr::BinaryOp {
1124            op: BinOp::Add,
1125            lhs: Box::new(col("a")),
1126            rhs: Box::new(count.clone()),
1127            span: Span::synthetic(),
1128        };
1129        assert!(contains_nested_aggregate(&binary));
1130
1131        let unary = Expr::UnaryOp {
1132            op: UnaryOp::Not,
1133            operand: Box::new(count.clone()),
1134            span: Span::synthetic(),
1135        };
1136        assert!(contains_nested_aggregate(&unary));
1137
1138        let cast = Expr::Cast {
1139            inner: Box::new(count.clone()),
1140            target: crate::storage::schema::DataType::Integer,
1141            span: Span::synthetic(),
1142        };
1143        assert!(contains_nested_aggregate(&cast));
1144
1145        let case = Expr::Case {
1146            branches: vec![(col("flag"), count.clone())],
1147            else_: Some(Box::new(col("fallback"))),
1148            span: Span::synthetic(),
1149        };
1150        assert!(contains_nested_aggregate(&case));
1151
1152        let in_list = Expr::InList {
1153            target: Box::new(col("id")),
1154            values: vec![count.clone()],
1155            negated: false,
1156            span: Span::synthetic(),
1157        };
1158        assert!(contains_nested_aggregate(&in_list));
1159
1160        let between = Expr::Between {
1161            target: Box::new(col("id")),
1162            low: Box::new(col("low")),
1163            high: Box::new(count),
1164            negated: false,
1165            span: Span::synthetic(),
1166        };
1167        assert!(contains_nested_aggregate(&between));
1168        assert!(!contains_nested_aggregate(&Expr::Parameter {
1169            index: 1,
1170            span: Span::synthetic()
1171        }));
1172
1173        assert!(super::super::parse("SELECT SUM(COUNT(id)) FROM t").is_err());
1174    }
1175
1176    #[test]
1177    fn table_clause_parsing_covers_as_of_order_offset_and_expand() {
1178        let table = parse_table(
1179            "SELECT name FROM users AS OF COMMIT 'abc123' \
1180             WHERE deleted_at IS NULL \
1181             ORDER BY LOWER(name) ASC NULLS FIRST, created_at DESC NULLS LAST \
1182             LIMIT 10 OFFSET 5 WITH EXPAND GRAPH DEPTH 3, CROSS_REFS",
1183        );
1184        assert!(matches!(table.as_of, Some(AsOfClause::Commit(ref v)) if v == "abc123"));
1185        assert!(table.filter.is_some());
1186        assert_eq!(table.order_by.len(), 2);
1187        assert!(table.order_by[0].expr.is_some());
1188        assert!(table.order_by[0].ascending);
1189        assert!(table.order_by[0].nulls_first);
1190        assert!(!table.order_by[1].ascending);
1191        assert!(!table.order_by[1].nulls_first);
1192        assert_eq!(table.limit, Some(10));
1193        assert_eq!(table.offset, Some(5));
1194        assert!(matches!(
1195            table.expand,
1196            Some(ExpandOptions {
1197                graph: true,
1198                graph_depth: 3,
1199                cross_refs: true,
1200                ..
1201            })
1202        ));
1203
1204        let table = parse_table("SELECT * FROM users AS OF BRANCH 'main'");
1205        assert!(matches!(table.as_of, Some(AsOfClause::Branch(ref v)) if v == "main"));
1206
1207        let table = parse_table("SELECT * FROM users AS OF TAG 'v1'");
1208        assert!(matches!(table.as_of, Some(AsOfClause::Tag(ref v)) if v == "v1"));
1209
1210        let table = parse_table("SELECT * FROM users AS OF TIMESTAMP 1710000000000");
1211        assert!(matches!(
1212            table.as_of,
1213            Some(AsOfClause::TimestampMs(1_710_000_000_000))
1214        ));
1215
1216        let table = parse_table("SELECT * FROM users AS OF SNAPSHOT 42");
1217        assert!(matches!(table.as_of, Some(AsOfClause::Snapshot(42))));
1218
1219        let table = parse_table("SELECT * FROM users WITH EXPAND");
1220        assert!(matches!(
1221            table.expand,
1222            Some(ExpandOptions {
1223                graph: true,
1224                graph_depth: 1,
1225                cross_refs: true,
1226                ..
1227            })
1228        ));
1229
1230        assert!(super::super::parse("SELECT * FROM users AS OF SNAPSHOT -1").is_err());
1231        assert!(super::super::parse("SELECT * FROM users AS OF UNKNOWN 'x'").is_err());
1232    }
1233
1234    #[test]
1235    fn direct_parser_helpers_cover_projection_group_order_and_literals() {
1236        let mut parser = Parser::new("name, LOWER(email) AS email_l").unwrap();
1237        let projections = parser.parse_projection_list().unwrap();
1238        assert_eq!(projections.len(), 2);
1239
1240        let mut parser = Parser::new("dept, TIME_BUCKET(5 m)").unwrap();
1241        let group_by = parser.parse_group_by_list().unwrap();
1242        assert_eq!(group_by, vec!["dept", "TIME_BUCKET(5m)"]);
1243
1244        let mut parser = Parser::new("LOWER(name) DESC, created_at").unwrap();
1245        let order_by = parser.parse_order_by_list().unwrap();
1246        assert_eq!(order_by.len(), 2);
1247        assert!(order_by[0].expr.is_some());
1248        assert!(!order_by[0].ascending);
1249        assert!(order_by[0].nulls_first);
1250        assert!(order_by[1].ascending);
1251        assert!(!order_by[1].nulls_first);
1252
1253        let mut parser = Parser::new("-5 ms").unwrap();
1254        assert_eq!(parser.parse_function_literal_arg().unwrap(), "-5ms");
1255        let mut parser = Parser::new("2.0 H").unwrap();
1256        assert_eq!(parser.parse_function_literal_arg().unwrap(), "2h");
1257        let mut parser = Parser::new("bad").unwrap();
1258        assert!(parser.parse_function_literal_arg().is_err());
1259    }
1260
1261    #[test]
1262    fn from_subquery_source_is_preserved() {
1263        let parsed = super::super::parse("FROM (SELECT id FROM users) AS u RETURN u.id")
1264            .unwrap()
1265            .query;
1266        let QueryExpr::Table(table) = parsed else {
1267            panic!("expected table query");
1268        };
1269        assert_eq!(table.table, "__subq_u");
1270        assert_eq!(table.alias.as_deref(), Some("u"));
1271        assert!(matches!(table.source, Some(TableSource::Subquery(_))));
1272        assert_eq!(table.select_items.len(), 1);
1273
1274        assert!(super::super::parse("FROM (MATCH (n) RETURN n) AS g").is_err());
1275    }
1276
1277    // ── SESSIONIZE operator (issue #585 slice 8) ──
1278
1279    #[test]
1280    fn test_parse_sessionize_full_clause() {
1281        let q = parse_table(
1282            "SELECT user_id, ts FROM events SESSIONIZE BY user_id GAP 30 m ORDER BY ts",
1283        );
1284        let s = q.sessionize.expect("sessionize present");
1285        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1286        assert_eq!(s.gap_ms, Some(30 * 60_000));
1287        assert_eq!(s.order_col.as_deref(), Some("ts"));
1288    }
1289
1290    #[test]
1291    fn test_parse_sessionize_omits_optional_order_by() {
1292        let q = parse_table("SELECT * FROM events SESSIONIZE BY user_id GAP 5 s");
1293        let s = q.sessionize.expect("sessionize present");
1294        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1295        assert_eq!(s.gap_ms, Some(5_000));
1296        assert!(s.order_col.is_none());
1297    }
1298
1299    #[test]
1300    fn test_parse_sessionize_bare_defers_to_descriptor() {
1301        // Both BY and GAP omitted — parser accepts the shape; the
1302        // executor raises MissingSessionKey when the descriptor
1303        // doesn't supply defaults.
1304        let q = parse_table("SELECT * FROM events SESSIONIZE");
1305        let s = q.sessionize.expect("sessionize present");
1306        assert!(s.actor_col.is_none());
1307        assert!(s.gap_ms.is_none());
1308        assert!(s.order_col.is_none());
1309    }
1310
1311    #[test]
1312    fn test_parse_sessionize_composes_with_where_and_limit() {
1313        let q = parse_table(
1314            "SELECT user_id FROM events \
1315             SESSIONIZE BY user_id GAP 1 m \
1316             WHERE user_id = 'u1' LIMIT 10",
1317        );
1318        let s = q.sessionize.expect("sessionize present");
1319        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1320        assert_eq!(s.gap_ms, Some(60_000));
1321        assert!(q.where_expr.is_some(), "WHERE still parsed");
1322        assert_eq!(q.limit, Some(10));
1323    }
1324
1325    #[test]
1326    fn test_parse_sessionize_absent_leaves_field_none() {
1327        let q = parse_table("SELECT * FROM events");
1328        assert!(q.sessionize.is_none());
1329    }
1330
1331    #[test]
1332    fn test_parse_sessionize_with_session_id_in_projection_e2e_shape() {
1333        // Matches the literal shape e2e tests use — session_id in the
1334        // projection list must not confuse the parser.
1335        let q = parse_table(
1336            "SELECT id, user_id, ts, session_id FROM events \
1337             SESSIONIZE BY user_id GAP 30 s ORDER BY ts",
1338        );
1339        let s = q.sessionize.expect("sessionize present");
1340        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1341        assert_eq!(s.gap_ms, Some(30_000));
1342    }
1343}