Skip to main content

activecube_rs/compiler/
parser.rs

1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9    CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10    metric_key, dim_agg_key,
11};
12
13/// Describes a metric requested in the GraphQL selection set.
14pub struct MetricRequest {
15    pub function: String,
16    /// GraphQL alias (e.g. `sum_in` in `sum_in: sum(...)`). Falls back to function name.
17    pub alias: String,
18    pub of_dimension: String,
19    /// The raw selectWhere value extracted from GraphQL arguments.
20    pub select_where_value: Option<async_graphql::Value>,
21    /// Pre-parsed condition filter for conditional aggregation (countIf/sumIf).
22    pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27    cube: &CubeDefinition,
28    network: &str,
29    args: &ObjectAccessor,
30    metrics: &[MetricRequest],
31    quantiles: &[QuantileRequest],
32    calculates: &[CalculateRequest],
33    field_aliases: &FieldAliasMap,
34    dim_aggs: &[DimAggRequest],
35    time_intervals: &[TimeIntervalRequest],
36    requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38    let flat = cube.flat_dimensions();
39    let requested_cols: Vec<String> = flat.iter()
40        .filter(|(path, _)| {
41            requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42        })
43        .map(|(_, dim)| dim.column.clone())
44        .collect();
45    let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47    let filters = if let Ok(where_val) = args.try_get("where") {
48        if let Ok(where_obj) = where_val.object() {
49            filter::parse_where(&where_obj, &cube.dimensions)?
50        } else {
51            FilterNode::Empty
52        }
53    } else {
54        FilterNode::Empty
55    };
56
57    let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58    let filters = if let Some(ref chain_col) = cube.chain_column {
59        let chain_filter = FilterNode::Condition {
60            column: chain_col.clone(),
61            op: CompareOp::Eq,
62            value: SqlValue::String(network.to_string()),
63        };
64        if filters.is_empty() {
65            chain_filter
66        } else {
67            FilterNode::And(vec![chain_filter, filters])
68        }
69    } else {
70        filters
71    };
72    let filters = apply_default_filters(filters, &cube.default_filters);
73    let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75    let mut selects: Vec<SelectExpr> = flat
76        .iter()
77        .filter(|(path, _)| {
78            requested_fields
79                .as_ref()
80                .is_none_or(|rf| rf.contains(path))
81        })
82        .map(|(_, dim)| SelectExpr::Column {
83            column: dim.column.clone(),
84            alias: None,
85        })
86        .collect();
87
88    // Include Array columns when the GraphQL selection requests them.
89    // Array dimensions are not in flat_dimensions(), but their parallel
90    // ClickHouse columns must appear in SELECT for the resolver to work.
91    let array_cols = cube.array_columns();
92    if !array_cols.is_empty() {
93        let selected_cols: HashSet<String> = selects.iter()
94            .filter_map(|s| match s {
95                SelectExpr::Column { column, .. } => Some(column.clone()),
96                _ => None,
97            })
98            .collect();
99        for (path, col) in &array_cols {
100            if selected_cols.contains(col) {
101                continue;
102            }
103            let should_include = requested_fields.as_ref().is_none_or(|rf| {
104                // Array path like "Instruction_Program_Arguments_Name".
105                // The parent prefix (e.g. "Instruction_Program_Arguments") must
106                // be a prefix of at least one requested field path for us to
107                // include this column.
108                let parent = path.rsplit_once('_').map(|(p, _)| p).unwrap_or(path);
109                rf.iter().any(|f| f.starts_with(parent))
110            });
111            if should_include {
112                selects.push(SelectExpr::Column {
113                    column: col.clone(),
114                    alias: None,
115                });
116            }
117        }
118    }
119
120    if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
121        selects = flat
122            .iter()
123            .map(|(_, dim)| SelectExpr::Column {
124                column: dim.column.clone(),
125                alias: None,
126            })
127            .collect();
128    }
129
130    // --- Time interval bucketing: replace raw column with toStartOfInterval ---
131    for ti in time_intervals {
132        let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
133        let alias = dim_agg_key(&ti.graphql_alias);
134        for sel in &mut selects {
135            if let SelectExpr::Column { column, alias: ref mut a } = sel {
136                if column == &ti.column {
137                    *column = interval_expr.clone();
138                    *a = Some(alias.clone());
139                    break;
140                }
141            }
142        }
143    }
144
145    let (filters, agg_having) = split_aggregate_filters(filters);
146    let mut group_by = Vec::new();
147    let mut having = agg_having;
148
149    // --- Aggregation mode: triggered by metrics, dim aggs, or quantiles ---
150    if !metrics.is_empty() || !dim_aggs.is_empty() {
151        let agg_columns: HashSet<String> = dim_aggs.iter()
152            .map(|da| da.value_column.clone())
153            .collect();
154
155        group_by = selects
156            .iter()
157            .filter_map(|s| match s {
158                SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
159                    Some(column.clone())
160                }
161                _ => None,
162            })
163            .collect();
164
165        for da in dim_aggs {
166            selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
167            let alias = dim_agg_key(&da.graphql_alias);
168            let condition = da.condition_filter.as_ref().and_then(|f| {
169                let sql = compile_filter_inline(f);
170                if sql.is_empty() { None } else { Some(sql) }
171            });
172            let func_name = match da.agg_type {
173                DimAggType::ArgMax => "argMax",
174                DimAggType::ArgMin => "argMin",
175            };
176
177            selects.push(SelectExpr::DimAggregate {
178                agg_type: da.agg_type.clone(),
179                value_column: da.value_column.clone(),
180                compare_column: da.compare_column.clone(),
181                alias: alias.clone(),
182                condition,
183            });
184
185            if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
186                let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
187                let h = parse_select_where_from_value(obj, &agg_expr)?;
188                if !h.is_empty() {
189                    having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
190                }
191            }
192        }
193
194        for m in metrics {
195            let dim_col = flat.iter()
196                .find(|(path, _)| path == &m.of_dimension)
197                .map(|(_, dim)| dim.column.clone())
198                .unwrap_or_else(|| "*".to_string());
199            let alias = metric_key(&m.alias);
200            let metric_def = cube.find_metric(&m.function);
201
202            if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
203                let tmpl = md.expression_template.as_ref().unwrap();
204                let expanded = tmpl.replace("{column}", &dim_col);
205                selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
206            } else {
207                let func = m.function.to_uppercase();
208                let condition = m.condition_filter.as_ref().and_then(|f| {
209                    let sql = compile_filter_inline(f);
210                    if sql.is_empty() { None } else { Some(sql) }
211                });
212                selects.push(SelectExpr::Aggregate {
213                    function: func.clone(), column: dim_col.clone(),
214                    alias: alias.clone(), condition,
215                });
216                if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
217                    let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
218                        else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
219                        else { format!("{func}(`{dim_col}`)") };
220                    let h = parse_select_where_from_value(obj, &agg_expr)?;
221                    if !h.is_empty() {
222                        having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
223                    }
224                }
225            }
226        }
227    }
228
229    for q in quantiles {
230        let dim_col = flat.iter()
231            .find(|(path, _)| path == &q.of_dimension)
232            .map(|(_, dim)| dim.column.clone())
233            .unwrap_or_else(|| "*".to_string());
234        let alias = metric_key(&q.alias);
235        let expr = format!("quantile({})(`{}`)", q.level, dim_col);
236        selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
237        if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
238            group_by = selects.iter().filter_map(|s| match s {
239                SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
240                _ => None,
241            }).collect();
242        }
243    }
244
245    // --- Build unified allowed_keys from finalized selects (Bitquery pattern) ---
246    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
247
248    for calc in calculates {
249        let alias = metric_key(&calc.alias);
250        let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
251        selects.push(SelectExpr::Column {
252            column: format!("ifNotFinite(({resolved}), 0)"),
253            alias: Some(alias),
254        });
255    }
256
257    ensure_having_columns_in_selects(&having, &mut selects);
258
259    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
260    let order_by = parse_order_by(args, cube, &allowed_keys)?;
261
262    // Validate ORDER BY columns are compatible with GROUP BY context.
263    // In aggregation mode, ClickHouse requires ORDER BY columns to be either
264    // in GROUP BY or be aggregate expressions. Mirrors Ruby activecube behaviour
265    // which rejects ordering by fields not present in the query.
266    if !group_by.is_empty() && !order_by.is_empty() {
267        let group_set: HashSet<&str> = group_by.iter().map(|s| s.as_str()).collect();
268        let select_exprs: HashSet<&str> = selects.iter().map(|s| match s {
269            SelectExpr::Column { column, .. } => column.as_str(),
270            SelectExpr::Aggregate { alias, .. } => alias.as_str(),
271            SelectExpr::DimAggregate { alias, .. } => alias.as_str(),
272        }).collect();
273        for o in &order_by {
274            let col = o.column.as_str();
275            let is_in_group = group_set.contains(col);
276            let is_aggregate = col.contains('(') || select_exprs.contains(col);
277            if !is_in_group && !is_aggregate {
278                let field_name = flat.iter()
279                    .find(|(_, dim)| dim.column == col)
280                    .map(|(path, _)| path.as_str())
281                    .unwrap_or(col);
282                return Err(async_graphql::Error::new(format!(
283                    "Cannot order by '{}' in aggregation query — add the field to your selection or order by an aggregated metric instead.",
284                    field_name,
285                )));
286            }
287        }
288    }
289
290    let limit_by = parse_limit_by(args, cube)?;
291
292    let from_subquery = cube.from_subquery.as_ref().map(|s| {
293        s.replace("{schema}", &schema).replace("{chain}", network)
294    });
295
296    Ok(QueryIR {
297        cube: cube.name.clone(),
298        schema,
299        table,
300        selects,
301        filters,
302        having,
303        group_by,
304        order_by,
305        limit,
306        offset,
307        limit_by,
308        use_final: cube.use_final,
309        joins: Vec::new(),
310        custom_query_builder: cube.custom_query_builder.clone(),
311        from_subquery,
312    })
313}
314
315/// Parse a selectWhere value object (from GraphQL Value, not ObjectAccessor)
316/// into a HAVING FilterNode.
317fn parse_select_where_from_value(
318    obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
319    aggregate_expr: &str,
320) -> Result<FilterNode, async_graphql::Error> {
321    let mut conditions = Vec::new();
322
323    for (key, op) in &[
324        ("eq", CompareOp::Eq),
325        ("ne", CompareOp::Ne),
326        ("gt", CompareOp::Gt),
327        ("ge", CompareOp::Ge),
328        ("lt", CompareOp::Lt),
329        ("le", CompareOp::Le),
330    ] {
331        if let Some(val) = obj.get(*key) {
332            let sql_val = match val {
333                async_graphql::Value::String(s) => {
334                    if let Ok(f) = s.parse::<f64>() {
335                        SqlValue::Float(f)
336                    } else {
337                        SqlValue::String(s.clone())
338                    }
339                }
340                async_graphql::Value::Number(n) => {
341                    if let Some(f) = n.as_f64() {
342                        SqlValue::Float(f)
343                    } else {
344                        SqlValue::Int(n.as_i64().unwrap_or(0))
345                    }
346                }
347                _ => continue,
348            };
349            conditions.push(FilterNode::Condition {
350                column: aggregate_expr.to_string(),
351                op: op.clone(),
352                value: sql_val,
353            });
354        }
355    }
356
357    Ok(match conditions.len() {
358        0 => FilterNode::Empty,
359        1 => conditions.into_iter().next().unwrap(),
360        _ => FilterNode::And(conditions),
361    })
362}
363
364fn merge_selector_filters(
365    base: FilterNode,
366    args: &ObjectAccessor,
367    selectors: &[SelectorDef],
368) -> Result<FilterNode, async_graphql::Error> {
369    let mut extra = Vec::new();
370
371    for sel in selectors {
372        if let Ok(val) = args.try_get(&sel.graphql_name) {
373            if let Ok(obj) = val.object() {
374                let leaf_filters =
375                    filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
376                extra.extend(leaf_filters);
377            }
378        }
379    }
380
381    if extra.is_empty() {
382        return Ok(base);
383    }
384    if base.is_empty() {
385        return Ok(if extra.len() == 1 {
386            extra.remove(0)
387        } else {
388            FilterNode::And(extra)
389        });
390    }
391    extra.push(base);
392    Ok(FilterNode::And(extra))
393}
394
395fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
396    if defaults.is_empty() {
397        return user_filters;
398    }
399
400    let mut default_nodes: Vec<FilterNode> = defaults
401        .iter()
402        .map(|(col, val)| {
403            let sql_val = if val == "true" || val == "false" {
404                SqlValue::Bool(val == "true")
405            } else if let Ok(n) = val.parse::<i64>() {
406                SqlValue::Int(n)
407            } else {
408                SqlValue::String(val.clone())
409            };
410            FilterNode::Condition {
411                column: col.clone(),
412                op: CompareOp::Eq,
413                value: sql_val,
414            }
415        })
416        .collect();
417
418    if user_filters.is_empty() {
419        if default_nodes.len() == 1 {
420            return default_nodes.remove(0);
421        }
422        return FilterNode::And(default_nodes);
423    }
424
425    default_nodes.push(user_filters);
426    FilterNode::And(default_nodes)
427}
428
429fn parse_limit(
430    args: &ObjectAccessor,
431    default: u32,
432    max: u32,
433) -> Result<(u32, u32), async_graphql::Error> {
434    let mut limit = default;
435    let mut offset = 0u32;
436
437    if let Ok(limit_val) = args.try_get("limit") {
438        if let Ok(limit_obj) = limit_val.object() {
439            if let Ok(count) = limit_obj.try_get("count") {
440                limit = (count.i64()? as u32).min(max);
441            }
442            if let Ok(off) = limit_obj.try_get("offset") {
443                offset = off.i64()? as u32;
444            }
445        }
446    }
447
448    Ok((limit, offset))
449}
450
451fn parse_order_by(
452    args: &ObjectAccessor,
453    cube: &CubeDefinition,
454    allowed_keys: &HashMap<String, String>,
455) -> Result<Vec<OrderExpr>, async_graphql::Error> {
456    let order_val = match args.try_get("orderBy") {
457        Ok(v) => v,
458        Err(_) => return Ok(Vec::new()),
459    };
460
461    let obj = order_val.object()
462        .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
463    let flat = cube.flat_dimensions();
464
465    if let Ok(field) = obj.try_get("descending") {
466        let path = field.enum_name()
467            .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
468        let column = flat.iter()
469            .find(|(p, _)| p == path)
470            .map(|(_, dim)| dim.column.clone())
471            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
472        return Ok(vec![OrderExpr { column, descending: true }]);
473    }
474
475    if let Ok(field) = obj.try_get("ascending") {
476        let path = field.enum_name()
477            .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
478        let column = flat.iter()
479            .find(|(p, _)| p == path)
480            .map(|(_, dim)| dim.column.clone())
481            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
482        return Ok(vec![OrderExpr { column, descending: false }]);
483    }
484
485    if let Ok(field_str) = obj.try_get("descendingByField") {
486        let name = field_str.string()
487            .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
488        let column = resolve_field_in_keys(name, allowed_keys)?;
489        return Ok(vec![OrderExpr { column, descending: true }]);
490    }
491
492    if let Ok(field_str) = obj.try_get("ascendingByField") {
493        let name = field_str.string()
494            .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
495        let column = resolve_field_in_keys(name, allowed_keys)?;
496        return Ok(vec![OrderExpr { column, descending: false }]);
497    }
498
499    Ok(vec![])
500}
501
502/// Resolve a field reference against the unified allowed_keys registry.
503/// Tries the name as-is, then with metric/dim_agg prefixes.
504fn resolve_field_in_keys(
505    name: &str,
506    allowed_keys: &HashMap<String, String>,
507) -> Result<String, async_graphql::Error> {
508    if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
509    Err(async_graphql::Error::new(format!(
510        "Can't use '{name}' in sorting/ordering. Field not found in executed query."
511    )))
512}
513
514/// Build unified allowed_keys from finalized selects.
515/// Maps user-facing reference names → SQL column/expression.
516/// Follows Bitquery convention for `descendingByField`:
517///   - `{alias}` for metrics/dim aggs (e.g. "count", "high")
518///   - `{parent}_{alias}_{suffix}` for dim aggs (e.g. "BalanceUpdate_Balance_maximum")
519///   - `{parent}_{alias}` for time intervals (e.g. "Block_Timefield")
520fn collect_select_keys(
521    selects: &[SelectExpr],
522    flat: &[(String, crate::cube::definition::Dimension)],
523    field_aliases: &FieldAliasMap,
524    dim_aggs: &[DimAggRequest],
525    time_intervals: &[TimeIntervalRequest],
526) -> HashMap<String, String> {
527    let mut keys = HashMap::new();
528    for sel in selects {
529        match sel {
530            SelectExpr::Column { column, alias: Some(a) } => {
531                keys.insert(a.clone(), column.clone());
532                if let Some(name) = a.strip_prefix("__da_") {
533                    keys.insert(name.to_string(), column.clone());
534                } else if let Some(name) = a.strip_prefix("__") {
535                    keys.insert(name.to_string(), column.clone());
536                }
537            }
538            SelectExpr::Column { column, alias: None } => {
539                if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
540                    keys.insert(path.clone(), column.clone());
541                }
542                keys.insert(column.clone(), column.clone());
543            }
544            SelectExpr::Aggregate { alias, function, column, .. } => {
545                let expr = format_agg_sql(function, column);
546                keys.insert(alias.clone(), expr.clone());
547                if let Some(name) = alias.strip_prefix("__") {
548                    keys.insert(name.to_string(), expr);
549                }
550            }
551            SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
552                let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
553                keys.insert(alias.clone(), expr.clone());
554                if let Some(name) = alias.strip_prefix("__da_") {
555                    keys.insert(name.to_string(), expr);
556                }
557            }
558        }
559    }
560    // Bitquery convention: dim agg references — both with and without _maximum/_minimum suffix
561    for da in dim_aggs {
562        let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
563        let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
564        // {alias}_{suffix} — e.g. "QuotePostAmount_maximum"
565        keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
566        // {field_path}_{suffix} — e.g. "Pool_Quote_PostAmount_maximum"
567        keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
568        if let Some(i) = da.field_path.rfind('_') {
569            let parent = &da.field_path[..i];
570            // {parent}_{alias}_{suffix} — e.g. "Pool_Quote_QuotePostAmount_maximum"
571            keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
572            // {parent}_{alias} (no suffix) — for calculate $variable refs, e.g. "Trade_CurrentPrice"
573            keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
574        }
575    }
576    // Bitquery convention: time interval → {parent}_{alias}
577    for ti in time_intervals {
578        let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
579        if let Some(i) = ti.field_path.rfind('_') {
580            let parent = &ti.field_path[..i];
581            keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
582        }
583    }
584    for (alias_path, column) in field_aliases {
585        keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
586    }
587    keys
588}
589
590fn format_agg_sql(function: &str, column: &str) -> String {
591    let func = function.to_uppercase();
592    let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
593    match (func.as_str(), column) {
594        ("COUNT", "*") => "count()".to_string(),
595        ("UNIQ", _) => format!("uniq({qcol})"),
596        (f, _) => format!("{}({qcol})", f.to_lowercase()),
597    }
598}
599
600fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
601    let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
602    let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
603    let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
604    format!("{func}({qval}, {qcmp})")
605}
606
607/// Resolve `$variable` references in a calculate expression using allowed_keys.
608fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
609    let mut result = String::new();
610    let mut chars = expression.chars().peekable();
611    while let Some(ch) = chars.next() {
612        if ch == '$' {
613            let mut var_name = String::new();
614            while let Some(&c) = chars.peek() {
615                if c.is_alphanumeric() || c == '_' {
616                    var_name.push(c);
617                    chars.next();
618                } else {
619                    break;
620                }
621            }
622            if !var_name.is_empty() {
623                if let Some(resolved) = allowed_keys.get(&var_name) {
624                    let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
625                    result.push_str(&format!("toFloat64({col_ref})"));
626                } else {
627                    result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
628                }
629            } else {
630                result.push('$');
631            }
632        } else {
633            result.push(ch);
634        }
635    }
636    result
637}
638
639fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
640    let unit_sql = match unit {
641        "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
642        "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
643    };
644    format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
645}
646
647/// Compile a FilterNode into an inline SQL fragment (no parameterized bindings).
648/// Used for embedding conditions inside aggregate functions (countIf, sumIf).
649fn compile_filter_inline(node: &FilterNode) -> String {
650    match node {
651        FilterNode::Empty => String::new(),
652        FilterNode::Condition { column, op, value } => {
653            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
654            if op.is_unary() {
655                return format!("{col} {}", op.sql_op());
656            }
657            let val_str = match value {
658                SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
659                SqlValue::Int(i) => i.to_string(),
660                SqlValue::Float(f) => f.to_string(),
661                SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
662                SqlValue::Expression(e) => e.clone(),
663            };
664            match op {
665                CompareOp::In | CompareOp::NotIn => {
666                    if let SqlValue::String(csv) = value {
667                        let items: Vec<String> = csv.split(',')
668                            .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
669                            .collect();
670                        format!("{col} {} ({})", op.sql_op(), items.join(", "))
671                    } else {
672                        format!("{col} {} ({val_str})", op.sql_op())
673                    }
674                }
675                CompareOp::Includes => {
676                    let like_val = match value {
677                        SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
678                        _ => val_str,
679                    };
680                    format!("{col} LIKE {like_val}")
681                }
682                _ => format!("{col} {} {val_str}", op.sql_op()),
683            }
684        }
685        FilterNode::And(children) => {
686            let parts: Vec<String> = children.iter()
687                .map(compile_filter_inline)
688                .filter(|s| !s.is_empty())
689                .collect();
690            match parts.len() {
691                0 => String::new(),
692                1 => parts.into_iter().next().unwrap(),
693                _ => format!("({})", parts.join(" AND ")),
694            }
695        }
696        FilterNode::Or(children) => {
697            let parts: Vec<String> = children.iter()
698                .map(compile_filter_inline)
699                .filter(|s| !s.is_empty())
700                .collect();
701            match parts.len() {
702                0 => String::new(),
703                1 => parts.into_iter().next().unwrap(),
704                _ => format!("({})", parts.join(" OR ")),
705            }
706        }
707        FilterNode::ArrayIncludes { .. } => {
708            // ArrayIncludes is compiled by the SQL dialect, not inline.
709            // In conditional aggregation context, this is not expected.
710            String::new()
711        }
712    }
713}
714
715/// Walk a HAVING FilterNode and append any referenced aggregate columns that
716/// are missing from `selects`. The SQL dialect will assign aliases later.
717fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
718    let cols = collect_having_columns(having);
719    for col in cols {
720        if !col.contains('(') {
721            continue;
722        }
723        let already_present = selects.iter().any(|s| match s {
724            SelectExpr::Column { column, .. } => column == &col,
725            _ => false,
726        });
727        if !already_present {
728            selects.push(SelectExpr::Column {
729                column: col,
730                alias: None,
731            });
732        }
733    }
734}
735
736fn collect_having_columns(node: &FilterNode) -> Vec<String> {
737    match node {
738        FilterNode::Empty => vec![],
739        FilterNode::Condition { column, .. } => vec![column.clone()],
740        FilterNode::And(children) | FilterNode::Or(children) => {
741            children.iter().flat_map(collect_having_columns).collect()
742        }
743        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
744    }
745}
746
747/// Detect if a column expression is an aggregate function call.
748/// Matches patterns like `argMaxMerge(...)`, `countMerge(...)`, `sumMerge(...)`, etc.
749fn is_aggregate_column(column: &str) -> bool {
750    column.contains('(')
751}
752
753/// Walk a FilterNode tree and split it into (where_part, having_part).
754/// Leaf conditions on aggregate columns go to HAVING; everything else stays in WHERE.
755fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
756    match node {
757        FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
758        FilterNode::Condition { ref column, .. } => {
759            if is_aggregate_column(column) {
760                (FilterNode::Empty, node)
761            } else {
762                (node, FilterNode::Empty)
763            }
764        }
765        FilterNode::And(children) => {
766            let mut where_parts = Vec::new();
767            let mut having_parts = Vec::new();
768            for child in children {
769                let (w, h) = split_aggregate_filters(child);
770                if !w.is_empty() { where_parts.push(w); }
771                if !h.is_empty() { having_parts.push(h); }
772            }
773            let where_node = match where_parts.len() {
774                0 => FilterNode::Empty,
775                1 => where_parts.into_iter().next().unwrap(),
776                _ => FilterNode::And(where_parts),
777            };
778            let having_node = match having_parts.len() {
779                0 => FilterNode::Empty,
780                1 => having_parts.into_iter().next().unwrap(),
781                _ => FilterNode::And(having_parts),
782            };
783            (where_node, having_node)
784        }
785        FilterNode::Or(children) => {
786            let any_aggregate = children.iter().any(filter_has_aggregate);
787            if any_aggregate {
788                (FilterNode::Empty, FilterNode::Or(children))
789            } else {
790                (FilterNode::Or(children), FilterNode::Empty)
791            }
792        }
793        FilterNode::ArrayIncludes { .. } => {
794            // ArrayIncludes always goes to WHERE, never HAVING
795            (node, FilterNode::Empty)
796        }
797    }
798}
799
800fn filter_has_aggregate(node: &FilterNode) -> bool {
801    match node {
802        FilterNode::Empty => false,
803        FilterNode::Condition { column, .. } => is_aggregate_column(column),
804        FilterNode::And(children) | FilterNode::Or(children) => {
805            children.iter().any(filter_has_aggregate)
806        }
807        FilterNode::ArrayIncludes { .. } => false,
808    }
809}
810
811fn parse_limit_by(
812    args: &ObjectAccessor,
813    cube: &CubeDefinition,
814) -> Result<Option<LimitByExpr>, async_graphql::Error> {
815    let lb_val = match args.try_get("limitBy") {
816        Ok(v) => v,
817        Err(_) => return Ok(None),
818    };
819    let lb_obj = lb_val.object()?;
820    let count = lb_obj.try_get("count")?.i64()? as u32;
821    let offset = lb_obj
822        .try_get("offset")
823        .ok()
824        .and_then(|v| v.i64().ok())
825        .unwrap_or(0) as u32;
826    let by_val = lb_obj.try_get("by")?;
827    let by_str = by_val.enum_name()?;
828
829    let flat = cube.flat_dimensions();
830    let column = flat.iter()
831        .find(|(path, _)| path == by_str)
832        .map(|(_, dim)| dim.column.clone())
833        .ok_or_else(|| async_graphql::Error::new(
834            format!("Unknown limitBy field: {by_str}")
835        ))?;
836
837    Ok(Some(LimitByExpr { count, offset, columns: vec![column] }))
838}