Skip to main content

reddb_server/storage/query/parser/
table.rs

1//! Table query parsing (SELECT ... FROM ...)
2
3use super::super::ast::{
4    BinOp, CompareOp, Expr, FieldRef, Filter, OrderByClause, Projection, QueryExpr,
5    QueueSelectQuery, SelectItem, Span, TableQuery, UnaryOp,
6};
7use super::super::lexer::Token;
8use super::error::ParseError;
9use crate::storage::query::sql_lowering::{
10    expr_to_projection, filter_to_expr, select_item_to_projection,
11};
12use crate::storage::schema::Value;
13
14fn is_scalar_function(name: &str) -> bool {
15    matches!(
16        name,
17        "GEO_DISTANCE"
18            | "GEO_DISTANCE_VINCENTY"
19            | "GEO_BEARING"
20            | "GEO_MIDPOINT"
21            | "HAVERSINE"
22            | "VINCENTY"
23            | "TIME_BUCKET"
24            | "UPPER"
25            | "LOWER"
26            | "LENGTH"
27            | "CHAR_LENGTH"
28            | "CHARACTER_LENGTH"
29            | "OCTET_LENGTH"
30            | "BIT_LENGTH"
31            | "SUBSTRING"
32            | "SUBSTR"
33            | "POSITION"
34            | "TRIM"
35            | "LTRIM"
36            | "RTRIM"
37            | "BTRIM"
38            | "CONCAT"
39            | "CONCAT_WS"
40            | "REVERSE"
41            | "LEFT"
42            | "RIGHT"
43            | "QUOTE_LITERAL"
44            | "ABS"
45            | "ROUND"
46            | "COALESCE"
47            | "STDDEV"
48            | "VARIANCE"
49            | "MEDIAN"
50            | "PERCENTILE"
51            | "GROUP_CONCAT"
52            | "STRING_AGG"
53            | "FIRST"
54            | "LAST"
55            | "ARRAY_AGG"
56            | "COUNT_DISTINCT"
57            | "MONEY"
58            | "MONEY_ASSET"
59            | "MONEY_MINOR"
60            | "MONEY_SCALE"
61            | "VERIFY_PASSWORD"
62            | "CAST"
63            | "CASE"
64    )
65}
66
67fn is_aggregate_function(name: &str) -> bool {
68    matches!(
69        name,
70        "COUNT"
71            | "AVG"
72            | "SUM"
73            | "MIN"
74            | "MAX"
75            | "STDDEV"
76            | "VARIANCE"
77            | "MEDIAN"
78            | "PERCENTILE"
79            | "GROUP_CONCAT"
80            | "STRING_AGG"
81            | "FIRST"
82            | "LAST"
83            | "ARRAY_AGG"
84            | "COUNT_DISTINCT"
85    )
86}
87
88fn aggregate_token_name(token: &Token) -> Option<&'static str> {
89    match token {
90        Token::Count => Some("COUNT"),
91        Token::Sum => Some("SUM"),
92        Token::Avg => Some("AVG"),
93        Token::Min => Some("MIN"),
94        Token::Max => Some("MAX"),
95        Token::First => Some("FIRST"),
96        Token::Last => Some("LAST"),
97        _ => None,
98    }
99}
100
101fn scalar_token_name(token: &Token) -> Option<&'static str> {
102    match token {
103        Token::Left => Some("LEFT"),
104        Token::Right => Some("RIGHT"),
105        _ => None,
106    }
107}
108use super::Parser;
109
110impl<'a> Parser<'a> {
111    /// Parse SELECT ... FROM ... query
112    pub fn parse_select_query(&mut self) -> Result<QueryExpr, ParseError> {
113        // Recursion guard: nested subqueries (UNION, derived tables,
114        // EXISTS) re-enter through this point, so depth here bounds
115        // the SELECT-shaped recursion in addition to the expr Pratt
116        // climb guarded in `parse_expr_prec`.
117        self.enter_depth()?;
118        let result = self.parse_select_query_inner();
119        self.exit_depth();
120        result
121    }
122
123    fn parse_select_query_inner(&mut self) -> Result<QueryExpr, ParseError> {
124        self.expect(Token::Select)?;
125
126        // Parse column list
127        let (select_items, columns) = self.parse_select_items_and_projections()?;
128
129        // Parse optional table source. If omitted, default to `ANY` so the query
130        // can return mixed entities (table, document, graph, and vector) by default.
131        let has_from = self.consume(&Token::From)?;
132        let table = if has_from {
133            if self.consume(&Token::Queue)? {
134                let queue = self.expect_ident()?;
135                let filter = if self.consume(&Token::Where)? {
136                    Some(self.parse_filter()?)
137                } else {
138                    None
139                };
140                let limit = if self.consume(&Token::Limit)? {
141                    Some(self.parse_integer()? as u64)
142                } else {
143                    None
144                };
145                return Ok(QueryExpr::QueueSelect(QueueSelectQuery {
146                    queue,
147                    columns: queue_projection_columns(&columns)?,
148                    filter,
149                    limit,
150                }));
151            } else if self.consume(&Token::Star)? {
152                "*".to_string()
153            } else if self.consume(&Token::All)? {
154                "all".to_string()
155            } else {
156                self.expect_ident()?
157            }
158        } else {
159            "any".to_string()
160        };
161
162        // Parse optional alias (only when a FROM clause exists).
163        // `AS OF` is a clause — don't gobble the `AS` as an alias
164        // marker when the following token is `OF`.
165        let alias =
166            if !has_from || (self.check(&Token::As) && matches!(self.peek_next()?, Token::Of)) {
167                None
168            } else if self.consume(&Token::As)?
169                || (self.check(&Token::Ident("".into())) && !self.is_clause_keyword())
170            {
171                Some(self.expect_ident()?)
172            } else {
173                None
174            };
175
176        let mut query = TableQuery {
177            table,
178            source: None,
179            alias,
180            select_items,
181            columns,
182            where_expr: None,
183            filter: None,
184            group_by_exprs: Vec::new(),
185            group_by: Vec::new(),
186            having_expr: None,
187            having: None,
188            order_by: Vec::new(),
189            limit: None,
190            limit_param: None,
191            offset: None,
192            offset_param: None,
193            expand: None,
194            as_of: None,
195            sessionize: None,
196        };
197
198        if self.is_join_keyword() {
199            let return_items = std::mem::take(&mut query.select_items);
200            let return_ = std::mem::take(&mut query.columns);
201            let mut expr = self.parse_join_query(QueryExpr::Table(query))?;
202            if let QueryExpr::Join(join) = &mut expr {
203                join.return_items = return_items;
204                join.return_ = return_;
205            }
206            return Ok(expr);
207        }
208
209        // SESSIONIZE BY <ident> GAP <duration> [ORDER BY <ident>]
210        // — issue #585 slice 8. Parsed before WHERE/GROUP BY so the
211        // optional inner ORDER BY (which the user binds to the
212        // operator's timestamp axis) cannot be confused with the
213        // SELECT's top-level ORDER BY further down. Both `BY` and
214        // `GAP` may be omitted when the source collection's
215        // descriptor carries `SESSION_KEY` / `SESSION_GAP` defaults
216        // (slice 1) — the executor resolves them at run time and
217        // raises `MissingSessionKey` if neither side supplies a
218        // value.
219        if self.consume(&Token::Sessionize)? {
220            query.sessionize = Some(self.parse_sessionize_clause()?);
221        }
222
223        // Parse optional clauses
224        self.parse_table_clauses(&mut query)?;
225
226        Ok(QueryExpr::Table(query))
227    }
228
229    fn parse_sessionize_clause(
230        &mut self,
231    ) -> Result<crate::storage::query::ast::SessionizeClause, ParseError> {
232        use crate::storage::query::ast::SessionizeClause;
233
234        let mut clause = SessionizeClause::default();
235
236        if self.consume(&Token::By)? {
237            clause.actor_col = Some(self.expect_ident()?);
238        }
239        if self.consume(&Token::Gap)? {
240            let value = self.parse_float()?;
241            let unit = self.parse_duration_unit()?;
242            clause.gap_ms = Some((value * unit) as u64);
243        }
244        // Optional `ORDER BY <ident>` immediately after GAP. The
245        // top-level SELECT ORDER BY parsed by `parse_table_clauses`
246        // sees the next ORDER token, so this only consumes the one
247        // immediately attached to SESSIONIZE.
248        if self.consume(&Token::Order)? {
249            self.expect(Token::By)?;
250            clause.order_col = Some(self.expect_ident()?);
251        }
252        Ok(clause)
253    }
254}
255
256impl<'a> Parser<'a> {
257    /// Check if current identifier is a clause keyword
258    pub fn is_clause_keyword(&self) -> bool {
259        matches!(
260            self.peek(),
261            Token::Where
262                | Token::Order
263                | Token::Limit
264                | Token::Offset
265                | Token::Join
266                | Token::Inner
267                | Token::Left
268                | Token::Right
269                | Token::As
270                | Token::Sessionize
271        )
272    }
273
274    /// Parse projection list (column selections)
275    pub fn parse_projection_list(&mut self) -> Result<Vec<Projection>, ParseError> {
276        Ok(self.parse_select_items_and_projections()?.1)
277    }
278
279    pub(crate) fn parse_select_items_and_projections(
280        &mut self,
281    ) -> Result<(Vec<SelectItem>, Vec<Projection>), ParseError> {
282        // Handle SELECT *
283        if self.consume(&Token::Star)? {
284            return Ok((vec![SelectItem::Wildcard], Vec::new())); // Empty legacy vec means all columns
285        }
286
287        let mut select_items = Vec::new();
288        let mut projections = Vec::new();
289        loop {
290            let (item, proj) = self.parse_projection()?;
291            select_items.push(item);
292            projections.push(proj);
293
294            if !self.consume(&Token::Comma)? {
295                break;
296            }
297        }
298        Ok((select_items, projections))
299    }
300
301    /// Parse a single projection — supports columns, aggregate functions, and scalar functions
302    fn parse_projection(&mut self) -> Result<(SelectItem, Projection), ParseError> {
303        let expr = self.parse_expr()?;
304        if contains_nested_aggregate(&expr) && !is_plain_aggregate_expr(&expr) {
305            return Err(ParseError::new(
306                "aggregate function is not valid inside another expression".to_string(),
307                self.position(),
308            ));
309        }
310        let alias = if self.consume(&Token::As)? {
311            Some(self.expect_column_ident()?)
312        } else {
313            None
314        };
315        let select_item = SelectItem::Expr {
316            expr: expr.clone(),
317            alias: alias.clone(),
318        };
319        let projection = select_item_to_projection(&select_item).ok_or_else(|| {
320            ParseError::new(
321                "projection cannot yet be lowered to legacy runtime representation".to_string(),
322                self.position(),
323            )
324        })?;
325        Ok((select_item, projection))
326    }
327}
328
329fn contains_nested_aggregate(expr: &Expr) -> bool {
330    match expr {
331        Expr::FunctionCall { name, args, .. } => {
332            is_aggregate_function(&name.to_uppercase())
333                || args.iter().any(contains_nested_aggregate)
334        }
335        Expr::BinaryOp { lhs, rhs, .. } => {
336            contains_nested_aggregate(lhs) || contains_nested_aggregate(rhs)
337        }
338        Expr::UnaryOp { operand, .. } | Expr::IsNull { operand, .. } => {
339            contains_nested_aggregate(operand)
340        }
341        Expr::Cast { inner, .. } => contains_nested_aggregate(inner),
342        Expr::Case {
343            branches, else_, ..
344        } => {
345            branches.iter().any(|(cond, value)| {
346                contains_nested_aggregate(cond) || contains_nested_aggregate(value)
347            }) || else_.as_deref().is_some_and(contains_nested_aggregate)
348        }
349        Expr::InList { target, values, .. } => {
350            contains_nested_aggregate(target) || values.iter().any(contains_nested_aggregate)
351        }
352        Expr::Between {
353            target, low, high, ..
354        } => {
355            contains_nested_aggregate(target)
356                || contains_nested_aggregate(low)
357                || contains_nested_aggregate(high)
358        }
359        Expr::Literal { .. }
360        | Expr::Column { .. }
361        | Expr::Parameter { .. }
362        | Expr::Subquery { .. } => false,
363    }
364}
365
366fn is_plain_aggregate_expr(expr: &Expr) -> bool {
367    match expr {
368        Expr::FunctionCall { name, args, .. } if is_aggregate_function(&name.to_uppercase()) => {
369            !args.iter().any(contains_nested_aggregate)
370        }
371        _ => false,
372    }
373}
374
375fn attach_projection_alias(proj: Projection, alias: Option<String>) -> Projection {
376    let Some(alias) = alias else { return proj };
377    match proj {
378        Projection::Field(field, _) => Projection::Field(field, Some(alias)),
379        Projection::Expression(filter, _) => Projection::Expression(filter, Some(alias)),
380        Projection::Function(name, args) => {
381            if name.contains(':') {
382                Projection::Function(name, args)
383            } else {
384                Projection::Function(format!("{name}:{alias}"), args)
385            }
386        }
387        Projection::Column(column) => Projection::Alias(column, alias),
388        other => other,
389    }
390}
391
392fn queue_projection_columns(columns: &[Projection]) -> Result<Vec<String>, ParseError> {
393    let mut out = Vec::new();
394    for column in columns {
395        match column {
396            Projection::Column(name) => out.push(name.clone()),
397            Projection::Alias(name, _) => out.push(name.clone()),
398            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
399                out.push(column.clone());
400            }
401            Projection::All => return Ok(Vec::new()),
402            other => {
403                return Err(ParseError::new(
404                    format!(
405                        "unsupported SELECT FROM QUEUE projection {other:?}; use `SELECT *` or bare column names, or use queue verbs (PUSH, POP, PEEK, LEN, ACK, NACK, …) for queue operations"
406                    ),
407                    crate::storage::query::lexer::Position::default(),
408                ));
409            }
410        }
411    }
412    Ok(out)
413}
414
415impl<'a> Parser<'a> {
416    /// Parse table query clauses (AS OF, WHERE, GROUP BY, HAVING, ORDER BY, LIMIT, OFFSET)
417    pub fn parse_table_clauses(&mut self, query: &mut TableQuery) -> Result<(), ParseError> {
418        // AS OF clause — time-travel anchor. Must come before WHERE
419        // so the executor can bind the snapshot before filter eval.
420        if self.check(&Token::As) {
421            let next_is_of = matches!(self.peek_next()?, Token::Of);
422            if next_is_of {
423                self.expect(Token::As)?;
424                self.expect(Token::Of)?;
425                query.as_of = Some(self.parse_as_of_spec()?);
426            }
427        }
428
429        // WHERE clause
430        if self.consume(&Token::Where)? {
431            let filter = self.parse_filter()?;
432            query.where_expr = Some(filter_to_expr(&filter));
433            query.filter = Some(filter);
434        }
435
436        // GROUP BY clause
437        if self.consume(&Token::Group)? {
438            self.expect(Token::By)?;
439            let (group_by_exprs, group_by) = self.parse_group_by_items()?;
440            query.group_by_exprs = group_by_exprs;
441            query.group_by = group_by;
442        }
443
444        // HAVING clause (only valid after GROUP BY)
445        if !query.group_by_exprs.is_empty() && self.consume_ident_ci("HAVING")? {
446            let having = self.parse_filter()?;
447            query.having_expr = Some(filter_to_expr(&having));
448            query.having = Some(having);
449        }
450
451        // ORDER BY clause
452        if self.consume(&Token::Order)? {
453            self.expect(Token::By)?;
454            query.order_by = self.parse_order_by_list()?;
455        }
456
457        // LIMIT clause
458        if self.consume(&Token::Limit)? {
459            if matches!(self.peek(), Token::Dollar | Token::Question) {
460                query.limit_param = Some(self.parse_param_slot("LIMIT")?);
461                query.limit = None;
462            } else {
463                query.limit = Some(self.parse_integer()? as u64);
464            }
465        }
466
467        // OFFSET clause
468        if self.consume(&Token::Offset)? {
469            if matches!(self.peek(), Token::Dollar | Token::Question) {
470                query.offset_param = Some(self.parse_param_slot("OFFSET")?);
471                query.offset = None;
472            } else {
473                query.offset = Some(self.parse_integer()? as u64);
474            }
475        }
476
477        // WITH EXPAND clause
478        if self.consume(&Token::With)? && self.consume_ident_ci("EXPAND")? {
479            query.expand = Some(self.parse_expand_options()?);
480        }
481
482        Ok(())
483    }
484
485    /// Parse an AS OF spec after `AS OF` has already been consumed.
486    /// Grammar:
487    ///   AS OF COMMIT   '<hex>'
488    ///   AS OF BRANCH   '<name>'
489    ///   AS OF TAG      '<name>'
490    ///   AS OF TIMESTAMP <integer-ms>
491    ///   AS OF SNAPSHOT  <xid>
492    fn parse_as_of_spec(&mut self) -> Result<crate::storage::query::ast::AsOfClause, ParseError> {
493        use crate::storage::query::ast::AsOfClause;
494
495        // Keyword — accept both tokenized forms (e.g. Token::Commit
496        // if present) and bare identifiers for flexibility.
497        let keyword = match self.peek() {
498            Token::Ident(s) => {
499                let s = s.to_ascii_uppercase();
500                self.advance()?;
501                s
502            }
503            Token::Commit => {
504                self.advance()?;
505                "COMMIT".to_string()
506            }
507            other => {
508                return Err(ParseError::expected(
509                    vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
510                    other,
511                    self.position(),
512                ));
513            }
514        };
515
516        match keyword.as_str() {
517            "COMMIT" => {
518                let value = self.parse_string()?;
519                Ok(AsOfClause::Commit(value))
520            }
521            "BRANCH" => {
522                let value = self.parse_string()?;
523                Ok(AsOfClause::Branch(value))
524            }
525            "TAG" => {
526                let value = self.parse_string()?;
527                Ok(AsOfClause::Tag(value))
528            }
529            "TIMESTAMP" => {
530                let value = self.parse_integer()?;
531                Ok(AsOfClause::TimestampMs(value))
532            }
533            "SNAPSHOT" => {
534                let value = self.parse_integer()?;
535                if value < 0 {
536                    return Err(ParseError::new(
537                        "AS OF SNAPSHOT requires non-negative xid".to_string(),
538                        self.position(),
539                    ));
540                }
541                Ok(AsOfClause::Snapshot(value as u64))
542            }
543            other => Err(ParseError::expected(
544                vec!["COMMIT", "BRANCH", "TAG", "TIMESTAMP", "SNAPSHOT"],
545                &Token::Ident(other.into()),
546                self.position(),
547            )),
548        }
549    }
550
551    /// Parse EXPAND options: GRAPH [DEPTH n], CROSS_REFS, ALL
552    fn parse_expand_options(
553        &mut self,
554    ) -> Result<crate::storage::query::ast::ExpandOptions, ParseError> {
555        use crate::storage::query::ast::ExpandOptions;
556        let mut opts = ExpandOptions::default();
557
558        loop {
559            if self.consume(&Token::Graph)? || self.consume_ident_ci("GRAPH")? {
560                opts.graph = true;
561                opts.graph_depth = if self.consume(&Token::Depth)? {
562                    self.parse_integer()? as usize
563                } else {
564                    1
565                };
566            } else if self.consume_ident_ci("CROSS_REFS")?
567                || self.consume_ident_ci("CROSSREFS")?
568                || self.consume_ident_ci("REFS")?
569            {
570                opts.cross_refs = true;
571            } else if self.consume(&Token::All)? || self.consume_ident_ci("ALL")? {
572                opts.graph = true;
573                opts.cross_refs = true;
574                opts.graph_depth = 1;
575            } else {
576                break;
577            }
578            if !self.consume(&Token::Comma)? {
579                break;
580            }
581        }
582
583        if !opts.graph && !opts.cross_refs {
584            opts.graph = true;
585            opts.cross_refs = true;
586            opts.graph_depth = 1;
587        }
588
589        Ok(opts)
590    }
591
592    /// Parse GROUP BY field list
593    pub fn parse_group_by_list(&mut self) -> Result<Vec<String>, ParseError> {
594        Ok(self.parse_group_by_items()?.1)
595    }
596
597    fn parse_group_by_items(&mut self) -> Result<(Vec<Expr>, Vec<String>), ParseError> {
598        let mut exprs = Vec::new();
599        let mut fields = Vec::new();
600        loop {
601            let expr = self.parse_expr()?;
602            let rendered = render_group_by_expr(&expr).ok_or_else(|| {
603                ParseError::new(
604                    "GROUP BY expression cannot yet be lowered to legacy runtime representation"
605                        .to_string(),
606                    self.position(),
607                )
608            })?;
609            exprs.push(expr);
610            fields.push(rendered);
611            if !self.consume(&Token::Comma)? {
612                break;
613            }
614        }
615        Ok((exprs, fields))
616    }
617
618    /// Parse ORDER BY list.
619    ///
620    /// Fase 1.6 unlock: uses the new `Expr` Pratt parser so
621    /// `ORDER BY CAST(age AS INT)`, `ORDER BY a + b * 2`,
622    /// `ORDER BY last_seen - created_at` all parse cleanly. If the
623    /// parsed expression is a bare `Column`, we store it in the
624    /// legacy `field` slot and leave `expr` None so downstream
625    /// consumers (planner cost, mode translators) keep using the
626    /// fast path. Otherwise we stash the full tree in `expr` and
627    /// populate `field` with a synthetic marker that runtime code
628    /// never touches.
629    pub fn parse_order_by_list(&mut self) -> Result<Vec<OrderByClause>, ParseError> {
630        use super::super::ast::Expr as AstExpr;
631        let mut clauses = Vec::new();
632        loop {
633            let parsed = self.parse_expr()?;
634            let (field, expr_slot) = match parsed {
635                AstExpr::Column { field, .. } => (field, None),
636                other => (
637                    // Synthetic placeholder so legacy pattern-matches
638                    // on `OrderByClause.field` still destructure.
639                    // Runtime comparators check `expr` first when set,
640                    // so the sentinel never gets resolved against a
641                    // real record.
642                    FieldRef::TableColumn {
643                        table: String::new(),
644                        column: String::new(),
645                    },
646                    Some(other),
647                ),
648            };
649
650            let ascending = if self.consume(&Token::Desc)? {
651                false
652            } else {
653                self.consume(&Token::Asc)?;
654                true
655            };
656
657            let nulls_first = if self.consume(&Token::Nulls)? {
658                if self.consume(&Token::First)? {
659                    true
660                } else {
661                    self.expect(Token::Last)?;
662                    false
663                }
664            } else {
665                !ascending // Default: nulls last for ASC, first for DESC
666            };
667
668            clauses.push(OrderByClause {
669                field,
670                expr: expr_slot,
671                ascending,
672                nulls_first,
673            });
674
675            if !self.consume(&Token::Comma)? {
676                break;
677            }
678        }
679        Ok(clauses)
680    }
681
682    fn parse_function_literal_arg(&mut self) -> Result<String, ParseError> {
683        let negative = self.consume(&Token::Dash)?;
684        let mut literal = match self.advance()? {
685            Token::Integer(n) => {
686                if negative {
687                    format!("-{n}")
688                } else {
689                    n.to_string()
690                }
691            }
692            Token::Float(n) => {
693                let value = if negative { -n } else { n };
694                if value.fract().abs() < f64::EPSILON {
695                    format!("{}", value as i64)
696                } else {
697                    value.to_string()
698                }
699            }
700            other => {
701                return Err(ParseError::new(
702                    // F-05: `other` is a `Token` whose Display arms emit raw
703                    // user bytes for `Ident` / `String` / `JsonLiteral`.
704                    // Render via `{:?}` so CR/LF/NUL/quotes are escaped
705                    // before the message reaches downstream serialization
706                    // sinks.
707                    format!("expected number, got {:?}", other),
708                    self.position(),
709                ));
710            }
711        };
712
713        if let Token::Ident(unit) = self.peek().clone() {
714            if is_duration_unit(&unit) {
715                self.advance()?;
716                literal.push_str(&unit.to_ascii_lowercase());
717            }
718        }
719
720        Ok(literal)
721    }
722}
723
724fn is_duration_unit(unit: &str) -> bool {
725    matches!(
726        unit.to_ascii_lowercase().as_str(),
727        "ms" | "msec"
728            | "millisecond"
729            | "milliseconds"
730            | "s"
731            | "sec"
732            | "secs"
733            | "second"
734            | "seconds"
735            | "m"
736            | "min"
737            | "mins"
738            | "minute"
739            | "minutes"
740            | "h"
741            | "hr"
742            | "hrs"
743            | "hour"
744            | "hours"
745            | "d"
746            | "day"
747            | "days"
748    )
749}
750
751fn render_group_by_expr(expr: &Expr) -> Option<String> {
752    match expr {
753        Expr::Column { field, .. } => match field {
754            FieldRef::TableColumn { table, column } if table.is_empty() => Some(column.clone()),
755            FieldRef::TableColumn { table, column } => Some(format!("{table}.{column}")),
756            other => Some(format!("{other:?}")),
757        },
758        Expr::FunctionCall { name, args, .. } if name.eq_ignore_ascii_case("TIME_BUCKET") => {
759            let rendered = args
760                .iter()
761                .map(render_group_by_expr)
762                .collect::<Option<Vec<_>>>()?;
763            Some(format!("TIME_BUCKET({})", rendered.join(",")))
764        }
765        Expr::Literal { value, .. } => Some(match value {
766            Value::Null => String::new(),
767            Value::Text(text) => text.to_string(),
768            other => other.to_string(),
769        }),
770        _ => expr_to_projection(expr).map(|projection| match projection {
771            Projection::Field(FieldRef::TableColumn { table, column }, _) if table.is_empty() => {
772                column
773            }
774            Projection::Field(FieldRef::TableColumn { table, column }, _) => {
775                format!("{table}.{column}")
776            }
777            Projection::Function(name, args) => {
778                let rendered = args
779                    .iter()
780                    .map(render_group_by_function_arg)
781                    .collect::<Option<Vec<_>>>()
782                    .unwrap_or_default();
783                format!(
784                    "{}({})",
785                    name.split(':').next().unwrap_or(&name),
786                    rendered.join(",")
787                )
788            }
789            Projection::Column(column) | Projection::Alias(column, _) => column,
790            Projection::All => "*".to_string(),
791            Projection::Expression(_, _) => "expr".to_string(),
792            Projection::Field(other, _) => format!("{other:?}"),
793        }),
794    }
795}
796
797fn render_group_by_function_arg(arg: &Projection) -> Option<String> {
798    match arg {
799        Projection::Column(col) => Some(
800            col.strip_prefix("LIT:")
801                .map(str::to_string)
802                .unwrap_or_else(|| col.clone()),
803        ),
804        Projection::All => Some("*".to_string()),
805        _ => None,
806    }
807}
808
809#[cfg(test)]
810mod tests {
811    use super::*;
812    use crate::storage::query::ast::{AsOfClause, BinOp, CompareOp, ExpandOptions, TableSource};
813
814    fn parse_table(sql: &str) -> TableQuery {
815        let parsed = super::super::parse(sql).unwrap().query;
816        let QueryExpr::Table(table) = parsed else {
817            panic!("expected table query");
818        };
819        table
820    }
821
822    fn col(name: &str) -> Expr {
823        Expr::Column {
824            field: FieldRef::TableColumn {
825                table: String::new(),
826                column: name.to_string(),
827            },
828            span: Span::synthetic(),
829        }
830    }
831
832    #[test]
833    fn helper_function_catalogs_cover_all_names() {
834        for name in [
835            "GEO_DISTANCE",
836            "GEO_DISTANCE_VINCENTY",
837            "GEO_BEARING",
838            "GEO_MIDPOINT",
839            "HAVERSINE",
840            "VINCENTY",
841            "TIME_BUCKET",
842            "UPPER",
843            "LOWER",
844            "LENGTH",
845            "CHAR_LENGTH",
846            "CHARACTER_LENGTH",
847            "OCTET_LENGTH",
848            "BIT_LENGTH",
849            "SUBSTRING",
850            "SUBSTR",
851            "POSITION",
852            "TRIM",
853            "LTRIM",
854            "RTRIM",
855            "BTRIM",
856            "CONCAT",
857            "CONCAT_WS",
858            "REVERSE",
859            "LEFT",
860            "RIGHT",
861            "QUOTE_LITERAL",
862            "ABS",
863            "ROUND",
864            "COALESCE",
865            "STDDEV",
866            "VARIANCE",
867            "MEDIAN",
868            "PERCENTILE",
869            "GROUP_CONCAT",
870            "STRING_AGG",
871            "FIRST",
872            "LAST",
873            "ARRAY_AGG",
874            "COUNT_DISTINCT",
875            "MONEY",
876            "MONEY_ASSET",
877            "MONEY_MINOR",
878            "MONEY_SCALE",
879            "VERIFY_PASSWORD",
880            "CAST",
881            "CASE",
882        ] {
883            assert!(is_scalar_function(name), "{name}");
884        }
885        assert!(!is_scalar_function("NOT_A_FUNCTION"));
886
887        for name in [
888            "COUNT",
889            "AVG",
890            "SUM",
891            "MIN",
892            "MAX",
893            "STDDEV",
894            "VARIANCE",
895            "MEDIAN",
896            "PERCENTILE",
897            "GROUP_CONCAT",
898            "STRING_AGG",
899            "FIRST",
900            "LAST",
901            "ARRAY_AGG",
902            "COUNT_DISTINCT",
903        ] {
904            assert!(is_aggregate_function(name), "{name}");
905        }
906        assert!(!is_aggregate_function("LOWER"));
907
908        assert_eq!(aggregate_token_name(&Token::Count), Some("COUNT"));
909        assert_eq!(aggregate_token_name(&Token::Sum), Some("SUM"));
910        assert_eq!(aggregate_token_name(&Token::Avg), Some("AVG"));
911        assert_eq!(aggregate_token_name(&Token::Min), Some("MIN"));
912        assert_eq!(aggregate_token_name(&Token::Max), Some("MAX"));
913        assert_eq!(aggregate_token_name(&Token::First), Some("FIRST"));
914        assert_eq!(aggregate_token_name(&Token::Last), Some("LAST"));
915        assert_eq!(aggregate_token_name(&Token::Ident("COUNT".into())), None);
916
917        assert_eq!(scalar_token_name(&Token::Left), Some("LEFT"));
918        assert_eq!(scalar_token_name(&Token::Right), Some("RIGHT"));
919        assert_eq!(scalar_token_name(&Token::Ident("LEFT".into())), None);
920
921        for unit in [
922            "ms",
923            "msec",
924            "millisecond",
925            "milliseconds",
926            "s",
927            "sec",
928            "secs",
929            "second",
930            "seconds",
931            "m",
932            "min",
933            "mins",
934            "minute",
935            "minutes",
936            "h",
937            "hr",
938            "hrs",
939            "hour",
940            "hours",
941            "d",
942            "day",
943            "days",
944        ] {
945            assert!(is_duration_unit(unit), "{unit}");
946        }
947        assert!(!is_duration_unit("fortnight"));
948    }
949
950    #[test]
951    fn projection_and_group_render_helpers_cover_aliases_and_exprs() {
952        let field = FieldRef::TableColumn {
953            table: String::new(),
954            column: "name".into(),
955        };
956        let filter = Filter::Compare {
957            field: field.clone(),
958            op: CompareOp::Eq,
959            value: Value::text("alice"),
960        };
961
962        assert_eq!(
963            attach_projection_alias(Projection::Field(field.clone(), None), Some("n".into())),
964            Projection::Field(field.clone(), Some("n".into()))
965        );
966        assert_eq!(
967            attach_projection_alias(
968                Projection::Expression(Box::new(filter.clone()), None),
969                Some("ok".into())
970            ),
971            Projection::Expression(Box::new(filter), Some("ok".into()))
972        );
973        assert_eq!(
974            attach_projection_alias(
975                Projection::Function("LOWER".into(), vec![]),
976                Some("l".into())
977            ),
978            Projection::Function("LOWER:l".into(), vec![])
979        );
980        assert_eq!(
981            attach_projection_alias(
982                Projection::Function("LOWER:l".into(), vec![]),
983                Some("ignored".into())
984            ),
985            Projection::Function("LOWER:l".into(), vec![])
986        );
987        assert_eq!(
988            attach_projection_alias(Projection::Column("name".into()), Some("n".into())),
989            Projection::Alias("name".into(), "n".into())
990        );
991        assert_eq!(
992            attach_projection_alias(Projection::All, Some("ignored".into())),
993            Projection::All
994        );
995
996        assert_eq!(render_group_by_expr(&col("dept")).as_deref(), Some("dept"));
997        assert_eq!(
998            render_group_by_expr(&Expr::Column {
999                field: FieldRef::TableColumn {
1000                    table: "employees".into(),
1001                    column: "dept".into()
1002                },
1003                span: Span::synthetic()
1004            })
1005            .as_deref(),
1006            Some("employees.dept")
1007        );
1008        assert_eq!(
1009            render_group_by_expr(&Expr::Column {
1010                field: FieldRef::NodeId { alias: "n".into() },
1011                span: Span::synthetic()
1012            }),
1013            Some("NodeId { alias: \"n\" }".into())
1014        );
1015        assert_eq!(
1016            render_group_by_expr(&Expr::Literal {
1017                value: Value::Null,
1018                span: Span::synthetic()
1019            })
1020            .as_deref(),
1021            Some("")
1022        );
1023        assert_eq!(
1024            render_group_by_expr(&Expr::Literal {
1025                value: Value::text("5m"),
1026                span: Span::synthetic()
1027            })
1028            .as_deref(),
1029            Some("5m")
1030        );
1031        assert_eq!(
1032            render_group_by_expr(&Expr::Literal {
1033                value: Value::Integer(7),
1034                span: Span::synthetic()
1035            })
1036            .as_deref(),
1037            Some("7")
1038        );
1039        assert_eq!(
1040            render_group_by_expr(&Expr::FunctionCall {
1041                name: "TIME_BUCKET".into(),
1042                args: vec![
1043                    col("ts"),
1044                    Expr::Literal {
1045                        value: Value::text("5m"),
1046                        span: Span::synthetic()
1047                    }
1048                ],
1049                span: Span::synthetic()
1050            })
1051            .as_deref(),
1052            Some("TIME_BUCKET(ts,5m)")
1053        );
1054        assert_eq!(
1055            render_group_by_expr(&Expr::FunctionCall {
1056                name: "LOWER".into(),
1057                args: vec![col("dept")],
1058                span: Span::synthetic()
1059            })
1060            .as_deref(),
1061            Some("LOWER()")
1062        );
1063
1064        assert_eq!(
1065            render_group_by_function_arg(&Projection::Column("LIT:5m".into())),
1066            Some("5m".into())
1067        );
1068        assert_eq!(
1069            render_group_by_function_arg(&Projection::Column("dept".into())),
1070            Some("dept".into())
1071        );
1072        assert_eq!(
1073            render_group_by_function_arg(&Projection::All),
1074            Some("*".into())
1075        );
1076        assert_eq!(
1077            render_group_by_function_arg(&Projection::Function("LOWER".into(), vec![])),
1078            None
1079        );
1080    }
1081
1082    #[test]
1083    fn expression_aggregate_detection_branches() {
1084        let count = Expr::FunctionCall {
1085            name: "COUNT".into(),
1086            args: vec![col("id")],
1087            span: Span::synthetic(),
1088        };
1089        assert!(contains_nested_aggregate(&count));
1090        assert!(is_plain_aggregate_expr(&count));
1091
1092        let nested = Expr::FunctionCall {
1093            name: "SUM".into(),
1094            args: vec![count.clone()],
1095            span: Span::synthetic(),
1096        };
1097        assert!(contains_nested_aggregate(&nested));
1098        assert!(!is_plain_aggregate_expr(&nested));
1099
1100        let binary = Expr::BinaryOp {
1101            op: BinOp::Add,
1102            lhs: Box::new(col("a")),
1103            rhs: Box::new(count.clone()),
1104            span: Span::synthetic(),
1105        };
1106        assert!(contains_nested_aggregate(&binary));
1107
1108        let unary = Expr::UnaryOp {
1109            op: UnaryOp::Not,
1110            operand: Box::new(count.clone()),
1111            span: Span::synthetic(),
1112        };
1113        assert!(contains_nested_aggregate(&unary));
1114
1115        let cast = Expr::Cast {
1116            inner: Box::new(count.clone()),
1117            target: crate::storage::schema::DataType::Integer,
1118            span: Span::synthetic(),
1119        };
1120        assert!(contains_nested_aggregate(&cast));
1121
1122        let case = Expr::Case {
1123            branches: vec![(col("flag"), count.clone())],
1124            else_: Some(Box::new(col("fallback"))),
1125            span: Span::synthetic(),
1126        };
1127        assert!(contains_nested_aggregate(&case));
1128
1129        let in_list = Expr::InList {
1130            target: Box::new(col("id")),
1131            values: vec![count.clone()],
1132            negated: false,
1133            span: Span::synthetic(),
1134        };
1135        assert!(contains_nested_aggregate(&in_list));
1136
1137        let between = Expr::Between {
1138            target: Box::new(col("id")),
1139            low: Box::new(col("low")),
1140            high: Box::new(count),
1141            negated: false,
1142            span: Span::synthetic(),
1143        };
1144        assert!(contains_nested_aggregate(&between));
1145        assert!(!contains_nested_aggregate(&Expr::Parameter {
1146            index: 1,
1147            span: Span::synthetic()
1148        }));
1149
1150        assert!(super::super::parse("SELECT SUM(COUNT(id)) FROM t").is_err());
1151    }
1152
1153    #[test]
1154    fn table_clause_parsing_covers_as_of_order_offset_and_expand() {
1155        let table = parse_table(
1156            "SELECT name FROM users AS OF COMMIT 'abc123' \
1157             WHERE deleted_at IS NULL \
1158             ORDER BY LOWER(name) ASC NULLS FIRST, created_at DESC NULLS LAST \
1159             LIMIT 10 OFFSET 5 WITH EXPAND GRAPH DEPTH 3, CROSS_REFS",
1160        );
1161        assert!(matches!(table.as_of, Some(AsOfClause::Commit(ref v)) if v == "abc123"));
1162        assert!(table.filter.is_some());
1163        assert_eq!(table.order_by.len(), 2);
1164        assert!(table.order_by[0].expr.is_some());
1165        assert!(table.order_by[0].ascending);
1166        assert!(table.order_by[0].nulls_first);
1167        assert!(!table.order_by[1].ascending);
1168        assert!(!table.order_by[1].nulls_first);
1169        assert_eq!(table.limit, Some(10));
1170        assert_eq!(table.offset, Some(5));
1171        assert!(matches!(
1172            table.expand,
1173            Some(ExpandOptions {
1174                graph: true,
1175                graph_depth: 3,
1176                cross_refs: true,
1177                ..
1178            })
1179        ));
1180
1181        let table = parse_table("SELECT * FROM users AS OF BRANCH 'main'");
1182        assert!(matches!(table.as_of, Some(AsOfClause::Branch(ref v)) if v == "main"));
1183
1184        let table = parse_table("SELECT * FROM users AS OF TAG 'v1'");
1185        assert!(matches!(table.as_of, Some(AsOfClause::Tag(ref v)) if v == "v1"));
1186
1187        let table = parse_table("SELECT * FROM users AS OF TIMESTAMP 1710000000000");
1188        assert!(matches!(
1189            table.as_of,
1190            Some(AsOfClause::TimestampMs(1_710_000_000_000))
1191        ));
1192
1193        let table = parse_table("SELECT * FROM users AS OF SNAPSHOT 42");
1194        assert!(matches!(table.as_of, Some(AsOfClause::Snapshot(42))));
1195
1196        let table = parse_table("SELECT * FROM users WITH EXPAND");
1197        assert!(matches!(
1198            table.expand,
1199            Some(ExpandOptions {
1200                graph: true,
1201                graph_depth: 1,
1202                cross_refs: true,
1203                ..
1204            })
1205        ));
1206
1207        assert!(super::super::parse("SELECT * FROM users AS OF SNAPSHOT -1").is_err());
1208        assert!(super::super::parse("SELECT * FROM users AS OF UNKNOWN 'x'").is_err());
1209    }
1210
1211    #[test]
1212    fn direct_parser_helpers_cover_projection_group_order_and_literals() {
1213        let mut parser = Parser::new("name, LOWER(email) AS email_l").unwrap();
1214        let projections = parser.parse_projection_list().unwrap();
1215        assert_eq!(projections.len(), 2);
1216
1217        let mut parser = Parser::new("dept, TIME_BUCKET(5 m)").unwrap();
1218        let group_by = parser.parse_group_by_list().unwrap();
1219        assert_eq!(group_by, vec!["dept", "TIME_BUCKET(5m)"]);
1220
1221        let mut parser = Parser::new("LOWER(name) DESC, created_at").unwrap();
1222        let order_by = parser.parse_order_by_list().unwrap();
1223        assert_eq!(order_by.len(), 2);
1224        assert!(order_by[0].expr.is_some());
1225        assert!(!order_by[0].ascending);
1226        assert!(order_by[0].nulls_first);
1227        assert!(order_by[1].ascending);
1228        assert!(!order_by[1].nulls_first);
1229
1230        let mut parser = Parser::new("-5 ms").unwrap();
1231        assert_eq!(parser.parse_function_literal_arg().unwrap(), "-5ms");
1232        let mut parser = Parser::new("2.0 H").unwrap();
1233        assert_eq!(parser.parse_function_literal_arg().unwrap(), "2h");
1234        let mut parser = Parser::new("bad").unwrap();
1235        assert!(parser.parse_function_literal_arg().is_err());
1236    }
1237
1238    #[test]
1239    fn from_subquery_source_is_preserved() {
1240        let parsed = super::super::parse("FROM (SELECT id FROM users) AS u RETURN u.id")
1241            .unwrap()
1242            .query;
1243        let QueryExpr::Table(table) = parsed else {
1244            panic!("expected table query");
1245        };
1246        assert_eq!(table.table, "__subq_u");
1247        assert_eq!(table.alias.as_deref(), Some("u"));
1248        assert!(matches!(table.source, Some(TableSource::Subquery(_))));
1249        assert_eq!(table.select_items.len(), 1);
1250
1251        assert!(super::super::parse("FROM (MATCH (n) RETURN n) AS g").is_err());
1252    }
1253
1254    // ── SESSIONIZE operator (issue #585 slice 8) ──
1255
1256    #[test]
1257    fn test_parse_sessionize_full_clause() {
1258        let q = parse_table(
1259            "SELECT user_id, ts FROM events SESSIONIZE BY user_id GAP 30 m ORDER BY ts",
1260        );
1261        let s = q.sessionize.expect("sessionize present");
1262        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1263        assert_eq!(s.gap_ms, Some(30 * 60_000));
1264        assert_eq!(s.order_col.as_deref(), Some("ts"));
1265    }
1266
1267    #[test]
1268    fn test_parse_sessionize_omits_optional_order_by() {
1269        let q = parse_table("SELECT * FROM events SESSIONIZE BY user_id GAP 5 s");
1270        let s = q.sessionize.expect("sessionize present");
1271        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1272        assert_eq!(s.gap_ms, Some(5_000));
1273        assert!(s.order_col.is_none());
1274    }
1275
1276    #[test]
1277    fn test_parse_sessionize_bare_defers_to_descriptor() {
1278        // Both BY and GAP omitted — parser accepts the shape; the
1279        // executor raises MissingSessionKey when the descriptor
1280        // doesn't supply defaults.
1281        let q = parse_table("SELECT * FROM events SESSIONIZE");
1282        let s = q.sessionize.expect("sessionize present");
1283        assert!(s.actor_col.is_none());
1284        assert!(s.gap_ms.is_none());
1285        assert!(s.order_col.is_none());
1286    }
1287
1288    #[test]
1289    fn test_parse_sessionize_composes_with_where_and_limit() {
1290        let q = parse_table(
1291            "SELECT user_id FROM events \
1292             SESSIONIZE BY user_id GAP 1 m \
1293             WHERE user_id = 'u1' LIMIT 10",
1294        );
1295        let s = q.sessionize.expect("sessionize present");
1296        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1297        assert_eq!(s.gap_ms, Some(60_000));
1298        assert!(q.where_expr.is_some(), "WHERE still parsed");
1299        assert_eq!(q.limit, Some(10));
1300    }
1301
1302    #[test]
1303    fn test_parse_sessionize_absent_leaves_field_none() {
1304        let q = parse_table("SELECT * FROM events");
1305        assert!(q.sessionize.is_none());
1306    }
1307
1308    #[test]
1309    fn test_parse_sessionize_with_session_id_in_projection_e2e_shape() {
1310        // Matches the literal shape e2e tests use — session_id in the
1311        // projection list must not confuse the parser.
1312        let q = parse_table(
1313            "SELECT id, user_id, ts, session_id FROM events \
1314             SESSIONIZE BY user_id GAP 30 s ORDER BY ts",
1315        );
1316        let s = q.sessionize.expect("sessionize present");
1317        assert_eq!(s.actor_col.as_deref(), Some("user_id"));
1318        assert_eq!(s.gap_ms, Some(30_000));
1319    }
1320}