Skip to main content

activecube_rs/compiler/
parser.rs

1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9    CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10    metric_key, dim_agg_key,
11};
12
13/// Describes a metric requested in the GraphQL selection set.
14pub struct MetricRequest {
15    pub function: String,
16    /// GraphQL alias (e.g. `sum_in` in `sum_in: sum(...)`). Falls back to function name.
17    pub alias: String,
18    pub of_dimension: String,
19    /// The raw selectWhere value extracted from GraphQL arguments.
20    pub select_where_value: Option<async_graphql::Value>,
21    /// Pre-parsed condition filter for conditional aggregation (countIf/sumIf).
22    pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27    cube: &CubeDefinition,
28    network: &str,
29    args: &ObjectAccessor,
30    metrics: &[MetricRequest],
31    quantiles: &[QuantileRequest],
32    calculates: &[CalculateRequest],
33    field_aliases: &FieldAliasMap,
34    dim_aggs: &[DimAggRequest],
35    time_intervals: &[TimeIntervalRequest],
36    requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38    let flat = cube.flat_dimensions();
39    let requested_cols: Vec<String> = flat.iter()
40        .filter(|(path, _)| {
41            requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42        })
43        .map(|(_, dim)| dim.column.clone())
44        .collect();
45    let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47    let filters = if let Ok(where_val) = args.try_get("where") {
48        if let Ok(where_obj) = where_val.object() {
49            filter::parse_where(&where_obj, &cube.dimensions)?
50        } else {
51            FilterNode::Empty
52        }
53    } else {
54        FilterNode::Empty
55    };
56
57    let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58    let filters = if let Some(ref chain_col) = cube.chain_column {
59        let chain_filter = FilterNode::Condition {
60            column: chain_col.clone(),
61            op: CompareOp::Eq,
62            value: SqlValue::String(network.to_string()),
63        };
64        if filters.is_empty() {
65            chain_filter
66        } else {
67            FilterNode::And(vec![chain_filter, filters])
68        }
69    } else {
70        filters
71    };
72    let filters = apply_default_filters(filters, &cube.default_filters);
73    let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75    let mut selects: Vec<SelectExpr> = flat
76        .iter()
77        .filter(|(path, _)| {
78            requested_fields
79                .as_ref()
80                .is_none_or(|rf| rf.contains(path))
81        })
82        .map(|(_, dim)| SelectExpr::Column {
83            column: dim.column.clone(),
84            alias: None,
85        })
86        .collect();
87
88    if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
89        selects = flat
90            .iter()
91            .map(|(_, dim)| SelectExpr::Column {
92                column: dim.column.clone(),
93                alias: None,
94            })
95            .collect();
96    }
97
98    // --- Time interval bucketing: replace raw column with toStartOfInterval ---
99    for ti in time_intervals {
100        let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
101        let alias = dim_agg_key(&ti.graphql_alias);
102        for sel in &mut selects {
103            if let SelectExpr::Column { column, alias: ref mut a } = sel {
104                if column == &ti.column {
105                    *column = interval_expr.clone();
106                    *a = Some(alias.clone());
107                    break;
108                }
109            }
110        }
111    }
112
113    let (filters, agg_having) = split_aggregate_filters(filters);
114    let mut group_by = Vec::new();
115    let mut having = agg_having;
116
117    // --- Aggregation mode: triggered by metrics, dim aggs, or quantiles ---
118    if !metrics.is_empty() || !dim_aggs.is_empty() {
119        let agg_columns: HashSet<String> = dim_aggs.iter()
120            .map(|da| da.value_column.clone())
121            .collect();
122
123        group_by = selects
124            .iter()
125            .filter_map(|s| match s {
126                SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
127                    Some(column.clone())
128                }
129                _ => None,
130            })
131            .collect();
132
133        for da in dim_aggs {
134            selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
135            let alias = dim_agg_key(&da.graphql_alias);
136            let condition = da.condition_filter.as_ref().and_then(|f| {
137                let sql = compile_filter_inline(f);
138                if sql.is_empty() { None } else { Some(sql) }
139            });
140            let func_name = match da.agg_type {
141                DimAggType::ArgMax => "argMax",
142                DimAggType::ArgMin => "argMin",
143            };
144
145            selects.push(SelectExpr::DimAggregate {
146                agg_type: da.agg_type.clone(),
147                value_column: da.value_column.clone(),
148                compare_column: da.compare_column.clone(),
149                alias: alias.clone(),
150                condition,
151            });
152
153            if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
154                let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
155                let h = parse_select_where_from_value(obj, &agg_expr)?;
156                if !h.is_empty() {
157                    having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
158                }
159            }
160        }
161
162        for m in metrics {
163            let dim_col = flat.iter()
164                .find(|(path, _)| path == &m.of_dimension)
165                .map(|(_, dim)| dim.column.clone())
166                .unwrap_or_else(|| "*".to_string());
167            let alias = metric_key(&m.alias);
168            let metric_def = cube.find_metric(&m.function);
169
170            if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
171                let tmpl = md.expression_template.as_ref().unwrap();
172                let expanded = tmpl.replace("{column}", &dim_col);
173                selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
174            } else {
175                let func = m.function.to_uppercase();
176                let condition = m.condition_filter.as_ref().and_then(|f| {
177                    let sql = compile_filter_inline(f);
178                    if sql.is_empty() { None } else { Some(sql) }
179                });
180                selects.push(SelectExpr::Aggregate {
181                    function: func.clone(), column: dim_col.clone(),
182                    alias: alias.clone(), condition,
183                });
184                if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
185                    let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
186                        else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
187                        else { format!("{func}(`{dim_col}`)") };
188                    let h = parse_select_where_from_value(obj, &agg_expr)?;
189                    if !h.is_empty() {
190                        having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
191                    }
192                }
193            }
194        }
195    }
196
197    for q in quantiles {
198        let dim_col = flat.iter()
199            .find(|(path, _)| path == &q.of_dimension)
200            .map(|(_, dim)| dim.column.clone())
201            .unwrap_or_else(|| "*".to_string());
202        let alias = metric_key(&q.alias);
203        let expr = format!("quantile({})(`{}`)", q.level, dim_col);
204        selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
205        if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
206            group_by = selects.iter().filter_map(|s| match s {
207                SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
208                _ => None,
209            }).collect();
210        }
211    }
212
213    // --- Build unified allowed_keys from finalized selects (Bitquery pattern) ---
214    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
215
216    for calc in calculates {
217        let alias = metric_key(&calc.alias);
218        let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
219        selects.push(SelectExpr::Column {
220            column: format!("ifNotFinite(({resolved}), 0)"),
221            alias: Some(alias),
222        });
223    }
224
225    ensure_having_columns_in_selects(&having, &mut selects);
226
227    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
228    let order_by = parse_order_by(args, cube, &allowed_keys)?;
229    let limit_by = parse_limit_by(args, cube)?;
230
231    let from_subquery = cube.from_subquery.as_ref().map(|s| {
232        s.replace("{schema}", &schema).replace("{chain}", network)
233    });
234
235    Ok(QueryIR {
236        cube: cube.name.clone(),
237        schema,
238        table,
239        selects,
240        filters,
241        having,
242        group_by,
243        order_by,
244        limit,
245        offset,
246        limit_by,
247        use_final: cube.use_final,
248        joins: Vec::new(),
249        custom_query_builder: cube.custom_query_builder.clone(),
250        from_subquery,
251    })
252}
253
254/// Parse a selectWhere value object (from GraphQL Value, not ObjectAccessor)
255/// into a HAVING FilterNode.
256fn parse_select_where_from_value(
257    obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
258    aggregate_expr: &str,
259) -> Result<FilterNode, async_graphql::Error> {
260    let mut conditions = Vec::new();
261
262    for (key, op) in &[
263        ("eq", CompareOp::Eq),
264        ("ne", CompareOp::Ne),
265        ("gt", CompareOp::Gt),
266        ("ge", CompareOp::Ge),
267        ("lt", CompareOp::Lt),
268        ("le", CompareOp::Le),
269    ] {
270        if let Some(val) = obj.get(*key) {
271            let sql_val = match val {
272                async_graphql::Value::String(s) => {
273                    if let Ok(f) = s.parse::<f64>() {
274                        SqlValue::Float(f)
275                    } else {
276                        SqlValue::String(s.clone())
277                    }
278                }
279                async_graphql::Value::Number(n) => {
280                    if let Some(f) = n.as_f64() {
281                        SqlValue::Float(f)
282                    } else {
283                        SqlValue::Int(n.as_i64().unwrap_or(0))
284                    }
285                }
286                _ => continue,
287            };
288            conditions.push(FilterNode::Condition {
289                column: aggregate_expr.to_string(),
290                op: op.clone(),
291                value: sql_val,
292            });
293        }
294    }
295
296    Ok(match conditions.len() {
297        0 => FilterNode::Empty,
298        1 => conditions.into_iter().next().unwrap(),
299        _ => FilterNode::And(conditions),
300    })
301}
302
303fn merge_selector_filters(
304    base: FilterNode,
305    args: &ObjectAccessor,
306    selectors: &[SelectorDef],
307) -> Result<FilterNode, async_graphql::Error> {
308    let mut extra = Vec::new();
309
310    for sel in selectors {
311        if let Ok(val) = args.try_get(&sel.graphql_name) {
312            if let Ok(obj) = val.object() {
313                let leaf_filters =
314                    filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
315                extra.extend(leaf_filters);
316            }
317        }
318    }
319
320    if extra.is_empty() {
321        return Ok(base);
322    }
323    if base.is_empty() {
324        return Ok(if extra.len() == 1 {
325            extra.remove(0)
326        } else {
327            FilterNode::And(extra)
328        });
329    }
330    extra.push(base);
331    Ok(FilterNode::And(extra))
332}
333
334fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
335    if defaults.is_empty() {
336        return user_filters;
337    }
338
339    let mut default_nodes: Vec<FilterNode> = defaults
340        .iter()
341        .map(|(col, val)| {
342            let sql_val = if val == "true" || val == "false" {
343                SqlValue::Bool(val == "true")
344            } else if let Ok(n) = val.parse::<i64>() {
345                SqlValue::Int(n)
346            } else {
347                SqlValue::String(val.clone())
348            };
349            FilterNode::Condition {
350                column: col.clone(),
351                op: CompareOp::Eq,
352                value: sql_val,
353            }
354        })
355        .collect();
356
357    if user_filters.is_empty() {
358        if default_nodes.len() == 1 {
359            return default_nodes.remove(0);
360        }
361        return FilterNode::And(default_nodes);
362    }
363
364    default_nodes.push(user_filters);
365    FilterNode::And(default_nodes)
366}
367
368fn parse_limit(
369    args: &ObjectAccessor,
370    default: u32,
371    max: u32,
372) -> Result<(u32, u32), async_graphql::Error> {
373    let mut limit = default;
374    let mut offset = 0u32;
375
376    if let Ok(limit_val) = args.try_get("limit") {
377        if let Ok(limit_obj) = limit_val.object() {
378            if let Ok(count) = limit_obj.try_get("count") {
379                limit = (count.i64()? as u32).min(max);
380            }
381            if let Ok(off) = limit_obj.try_get("offset") {
382                offset = off.i64()? as u32;
383            }
384        }
385    }
386
387    Ok((limit, offset))
388}
389
390fn parse_order_by(
391    args: &ObjectAccessor,
392    cube: &CubeDefinition,
393    allowed_keys: &HashMap<String, String>,
394) -> Result<Vec<OrderExpr>, async_graphql::Error> {
395    let order_val = match args.try_get("orderBy") {
396        Ok(v) => v,
397        Err(_) => return Ok(Vec::new()),
398    };
399
400    let obj = order_val.object()
401        .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
402    let flat = cube.flat_dimensions();
403
404    if let Ok(field) = obj.try_get("descending") {
405        let path = field.enum_name()
406            .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
407        let column = flat.iter()
408            .find(|(p, _)| p == path)
409            .map(|(_, dim)| dim.column.clone())
410            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
411        return Ok(vec![OrderExpr { column, descending: true }]);
412    }
413
414    if let Ok(field) = obj.try_get("ascending") {
415        let path = field.enum_name()
416            .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
417        let column = flat.iter()
418            .find(|(p, _)| p == path)
419            .map(|(_, dim)| dim.column.clone())
420            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
421        return Ok(vec![OrderExpr { column, descending: false }]);
422    }
423
424    if let Ok(field_str) = obj.try_get("descendingByField") {
425        let name = field_str.string()
426            .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
427        let column = resolve_field_in_keys(name, allowed_keys)?;
428        return Ok(vec![OrderExpr { column, descending: true }]);
429    }
430
431    if let Ok(field_str) = obj.try_get("ascendingByField") {
432        let name = field_str.string()
433            .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
434        let column = resolve_field_in_keys(name, allowed_keys)?;
435        return Ok(vec![OrderExpr { column, descending: false }]);
436    }
437
438    Ok(vec![])
439}
440
441/// Resolve a field reference against the unified allowed_keys registry.
442/// Tries the name as-is, then with metric/dim_agg prefixes.
443fn resolve_field_in_keys(
444    name: &str,
445    allowed_keys: &HashMap<String, String>,
446) -> Result<String, async_graphql::Error> {
447    if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
448    Err(async_graphql::Error::new(format!(
449        "Can't use '{name}' in sorting/ordering. Field not found in executed query."
450    )))
451}
452
453/// Build unified allowed_keys from finalized selects.
454/// Maps user-facing reference names → SQL column/expression.
455/// Follows Bitquery convention for `descendingByField`:
456///   - `{alias}` for metrics/dim aggs (e.g. "count", "high")
457///   - `{parent}_{alias}_{suffix}` for dim aggs (e.g. "BalanceUpdate_Balance_maximum")
458///   - `{parent}_{alias}` for time intervals (e.g. "Block_Timefield")
459fn collect_select_keys(
460    selects: &[SelectExpr],
461    flat: &[(String, crate::cube::definition::Dimension)],
462    field_aliases: &FieldAliasMap,
463    dim_aggs: &[DimAggRequest],
464    time_intervals: &[TimeIntervalRequest],
465) -> HashMap<String, String> {
466    let mut keys = HashMap::new();
467    for sel in selects {
468        match sel {
469            SelectExpr::Column { column, alias: Some(a) } => {
470                keys.insert(a.clone(), column.clone());
471                if let Some(name) = a.strip_prefix("__da_") {
472                    keys.insert(name.to_string(), column.clone());
473                } else if let Some(name) = a.strip_prefix("__") {
474                    keys.insert(name.to_string(), column.clone());
475                }
476            }
477            SelectExpr::Column { column, alias: None } => {
478                if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
479                    keys.insert(path.clone(), column.clone());
480                }
481                keys.insert(column.clone(), column.clone());
482            }
483            SelectExpr::Aggregate { alias, function, column, .. } => {
484                let expr = format_agg_sql(function, column);
485                keys.insert(alias.clone(), expr.clone());
486                if let Some(name) = alias.strip_prefix("__") {
487                    keys.insert(name.to_string(), expr);
488                }
489            }
490            SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
491                let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
492                keys.insert(alias.clone(), expr.clone());
493                if let Some(name) = alias.strip_prefix("__da_") {
494                    keys.insert(name.to_string(), expr);
495                }
496            }
497        }
498    }
499    // Bitquery convention: dim agg references — both with and without _maximum/_minimum suffix
500    for da in dim_aggs {
501        let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
502        let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
503        // {alias}_{suffix} — e.g. "QuotePostAmount_maximum"
504        keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
505        // {field_path}_{suffix} — e.g. "Pool_Quote_PostAmount_maximum"
506        keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
507        if let Some(i) = da.field_path.rfind('_') {
508            let parent = &da.field_path[..i];
509            // {parent}_{alias}_{suffix} — e.g. "Pool_Quote_QuotePostAmount_maximum"
510            keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
511            // {parent}_{alias} (no suffix) — for calculate $variable refs, e.g. "Trade_CurrentPrice"
512            keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
513        }
514    }
515    // Bitquery convention: time interval → {parent}_{alias}
516    for ti in time_intervals {
517        let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
518        if let Some(i) = ti.field_path.rfind('_') {
519            let parent = &ti.field_path[..i];
520            keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
521        }
522    }
523    for (alias_path, column) in field_aliases {
524        keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
525    }
526    keys
527}
528
529fn format_agg_sql(function: &str, column: &str) -> String {
530    let func = function.to_uppercase();
531    let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
532    match (func.as_str(), column) {
533        ("COUNT", "*") => "count()".to_string(),
534        ("UNIQ", _) => format!("uniq({qcol})"),
535        (f, _) => format!("{}({qcol})", f.to_lowercase()),
536    }
537}
538
539fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
540    let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
541    let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
542    let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
543    format!("{func}({qval}, {qcmp})")
544}
545
546/// Resolve `$variable` references in a calculate expression using allowed_keys.
547fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
548    let mut result = String::new();
549    let mut chars = expression.chars().peekable();
550    while let Some(ch) = chars.next() {
551        if ch == '$' {
552            let mut var_name = String::new();
553            while let Some(&c) = chars.peek() {
554                if c.is_alphanumeric() || c == '_' {
555                    var_name.push(c);
556                    chars.next();
557                } else {
558                    break;
559                }
560            }
561            if !var_name.is_empty() {
562                if let Some(resolved) = allowed_keys.get(&var_name) {
563                    let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
564                    result.push_str(&format!("toFloat64({col_ref})"));
565                } else {
566                    result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
567                }
568            } else {
569                result.push('$');
570            }
571        } else {
572            result.push(ch);
573        }
574    }
575    result
576}
577
578fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
579    let unit_sql = match unit {
580        "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
581        "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
582    };
583    format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
584}
585
586/// Compile a FilterNode into an inline SQL fragment (no parameterized bindings).
587/// Used for embedding conditions inside aggregate functions (countIf, sumIf).
588fn compile_filter_inline(node: &FilterNode) -> String {
589    match node {
590        FilterNode::Empty => String::new(),
591        FilterNode::Condition { column, op, value } => {
592            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
593            if op.is_unary() {
594                return format!("{col} {}", op.sql_op());
595            }
596            let val_str = match value {
597                SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
598                SqlValue::Int(i) => i.to_string(),
599                SqlValue::Float(f) => f.to_string(),
600                SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
601                SqlValue::Expression(e) => e.clone(),
602            };
603            match op {
604                CompareOp::In | CompareOp::NotIn => {
605                    if let SqlValue::String(csv) = value {
606                        let items: Vec<String> = csv.split(',')
607                            .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
608                            .collect();
609                        format!("{col} {} ({})", op.sql_op(), items.join(", "))
610                    } else {
611                        format!("{col} {} ({val_str})", op.sql_op())
612                    }
613                }
614                CompareOp::Includes => {
615                    let like_val = match value {
616                        SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
617                        _ => val_str,
618                    };
619                    format!("{col} LIKE {like_val}")
620                }
621                _ => format!("{col} {} {val_str}", op.sql_op()),
622            }
623        }
624        FilterNode::And(children) => {
625            let parts: Vec<String> = children.iter()
626                .map(compile_filter_inline)
627                .filter(|s| !s.is_empty())
628                .collect();
629            match parts.len() {
630                0 => String::new(),
631                1 => parts.into_iter().next().unwrap(),
632                _ => format!("({})", parts.join(" AND ")),
633            }
634        }
635        FilterNode::Or(children) => {
636            let parts: Vec<String> = children.iter()
637                .map(compile_filter_inline)
638                .filter(|s| !s.is_empty())
639                .collect();
640            match parts.len() {
641                0 => String::new(),
642                1 => parts.into_iter().next().unwrap(),
643                _ => format!("({})", parts.join(" OR ")),
644            }
645        }
646        FilterNode::ArrayIncludes { .. } => {
647            // ArrayIncludes is compiled by the SQL dialect, not inline.
648            // In conditional aggregation context, this is not expected.
649            String::new()
650        }
651    }
652}
653
654/// Walk a HAVING FilterNode and append any referenced aggregate columns that
655/// are missing from `selects`. The SQL dialect will assign aliases later.
656fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
657    let cols = collect_having_columns(having);
658    for col in cols {
659        if !col.contains('(') {
660            continue;
661        }
662        let already_present = selects.iter().any(|s| match s {
663            SelectExpr::Column { column, .. } => column == &col,
664            _ => false,
665        });
666        if !already_present {
667            selects.push(SelectExpr::Column {
668                column: col,
669                alias: None,
670            });
671        }
672    }
673}
674
675fn collect_having_columns(node: &FilterNode) -> Vec<String> {
676    match node {
677        FilterNode::Empty => vec![],
678        FilterNode::Condition { column, .. } => vec![column.clone()],
679        FilterNode::And(children) | FilterNode::Or(children) => {
680            children.iter().flat_map(collect_having_columns).collect()
681        }
682        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
683    }
684}
685
686/// Detect if a column expression is an aggregate function call.
687/// Matches patterns like `argMaxMerge(...)`, `countMerge(...)`, `sumMerge(...)`, etc.
688fn is_aggregate_column(column: &str) -> bool {
689    column.contains('(')
690}
691
692/// Walk a FilterNode tree and split it into (where_part, having_part).
693/// Leaf conditions on aggregate columns go to HAVING; everything else stays in WHERE.
694fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
695    match node {
696        FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
697        FilterNode::Condition { ref column, .. } => {
698            if is_aggregate_column(column) {
699                (FilterNode::Empty, node)
700            } else {
701                (node, FilterNode::Empty)
702            }
703        }
704        FilterNode::And(children) => {
705            let mut where_parts = Vec::new();
706            let mut having_parts = Vec::new();
707            for child in children {
708                let (w, h) = split_aggregate_filters(child);
709                if !w.is_empty() { where_parts.push(w); }
710                if !h.is_empty() { having_parts.push(h); }
711            }
712            let where_node = match where_parts.len() {
713                0 => FilterNode::Empty,
714                1 => where_parts.into_iter().next().unwrap(),
715                _ => FilterNode::And(where_parts),
716            };
717            let having_node = match having_parts.len() {
718                0 => FilterNode::Empty,
719                1 => having_parts.into_iter().next().unwrap(),
720                _ => FilterNode::And(having_parts),
721            };
722            (where_node, having_node)
723        }
724        FilterNode::Or(children) => {
725            let any_aggregate = children.iter().any(filter_has_aggregate);
726            if any_aggregate {
727                (FilterNode::Empty, FilterNode::Or(children))
728            } else {
729                (FilterNode::Or(children), FilterNode::Empty)
730            }
731        }
732        FilterNode::ArrayIncludes { .. } => {
733            // ArrayIncludes always goes to WHERE, never HAVING
734            (node, FilterNode::Empty)
735        }
736    }
737}
738
739fn filter_has_aggregate(node: &FilterNode) -> bool {
740    match node {
741        FilterNode::Empty => false,
742        FilterNode::Condition { column, .. } => is_aggregate_column(column),
743        FilterNode::And(children) | FilterNode::Or(children) => {
744            children.iter().any(filter_has_aggregate)
745        }
746        FilterNode::ArrayIncludes { .. } => false,
747    }
748}
749
750fn parse_limit_by(
751    args: &ObjectAccessor,
752    cube: &CubeDefinition,
753) -> Result<Option<LimitByExpr>, async_graphql::Error> {
754    let lb_val = match args.try_get("limitBy") {
755        Ok(v) => v,
756        Err(_) => return Ok(None),
757    };
758    let lb_obj = lb_val.object()?;
759    let count = lb_obj.try_get("count")?.i64()? as u32;
760    let offset = lb_obj
761        .try_get("offset")
762        .ok()
763        .and_then(|v| v.i64().ok())
764        .unwrap_or(0) as u32;
765    let by_str = lb_obj.try_get("by")?.string()?;
766
767    let flat = cube.flat_dimensions();
768    let columns: Vec<String> = by_str
769        .split(',')
770        .map(|s| {
771            let trimmed = s.trim();
772            flat.iter()
773                .find(|(path, _)| path == trimmed)
774                .map(|(_, dim)| dim.column.clone())
775                .unwrap_or_else(|| trimmed.to_string())
776        })
777        .collect();
778
779    if columns.is_empty() {
780        return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
781    }
782
783    Ok(Some(LimitByExpr { count, offset, columns }))
784}