Skip to main content

activecube_rs/compiler/
parser.rs

1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9    CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10    metric_key, dim_agg_key,
11};
12
13/// Describes a metric requested in the GraphQL selection set.
14pub struct MetricRequest {
15    pub function: String,
16    /// GraphQL alias (e.g. `sum_in` in `sum_in: sum(...)`). Falls back to function name.
17    pub alias: String,
18    pub of_dimension: String,
19    /// The raw selectWhere value extracted from GraphQL arguments.
20    pub select_where_value: Option<async_graphql::Value>,
21    /// Pre-parsed condition filter for conditional aggregation (countIf/sumIf).
22    pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27    cube: &CubeDefinition,
28    network: &str,
29    args: &ObjectAccessor,
30    metrics: &[MetricRequest],
31    quantiles: &[QuantileRequest],
32    calculates: &[CalculateRequest],
33    field_aliases: &FieldAliasMap,
34    dim_aggs: &[DimAggRequest],
35    time_intervals: &[TimeIntervalRequest],
36    requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38    let flat = cube.flat_dimensions();
39    let requested_cols: Vec<String> = flat.iter()
40        .filter(|(path, _)| {
41            requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42        })
43        .map(|(_, dim)| dim.column.clone())
44        .collect();
45    let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47    let filters = if let Ok(where_val) = args.try_get("where") {
48        if let Ok(where_obj) = where_val.object() {
49            filter::parse_where(&where_obj, &cube.dimensions)?
50        } else {
51            FilterNode::Empty
52        }
53    } else {
54        FilterNode::Empty
55    };
56
57    let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58    let filters = if let Some(ref chain_col) = cube.chain_column {
59        let chain_filter = FilterNode::Condition {
60            column: chain_col.clone(),
61            op: CompareOp::Eq,
62            value: SqlValue::String(network.to_string()),
63        };
64        if filters.is_empty() {
65            chain_filter
66        } else {
67            FilterNode::And(vec![chain_filter, filters])
68        }
69    } else {
70        filters
71    };
72    let filters = apply_default_filters(filters, &cube.default_filters);
73    let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75    let mut selects: Vec<SelectExpr> = flat
76        .iter()
77        .filter(|(path, _)| {
78            requested_fields
79                .as_ref()
80                .is_none_or(|rf| rf.contains(path))
81        })
82        .map(|(_, dim)| SelectExpr::Column {
83            column: dim.column.clone(),
84            alias: None,
85        })
86        .collect();
87
88    // Include Array columns when the GraphQL selection requests them.
89    // Array dimensions are not in flat_dimensions(), but their parallel
90    // ClickHouse columns must appear in SELECT for the resolver to work.
91    let array_cols = cube.array_columns();
92    if !array_cols.is_empty() {
93        let selected_cols: HashSet<String> = selects.iter()
94            .filter_map(|s| match s {
95                SelectExpr::Column { column, .. } => Some(column.clone()),
96                _ => None,
97            })
98            .collect();
99        for (path, col) in &array_cols {
100            if selected_cols.contains(col) {
101                continue;
102            }
103            let should_include = requested_fields.as_ref().is_none_or(|rf| {
104                // Array path like "Instruction_Program_Arguments_Name".
105                // The parent prefix (e.g. "Instruction_Program_Arguments") must
106                // be a prefix of at least one requested field path for us to
107                // include this column.
108                let parent = path.rsplit_once('_').map(|(p, _)| p).unwrap_or(path);
109                rf.iter().any(|f| f.starts_with(parent))
110            });
111            if should_include {
112                selects.push(SelectExpr::Column {
113                    column: col.clone(),
114                    alias: None,
115                });
116            }
117        }
118    }
119
120    if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
121        selects = flat
122            .iter()
123            .map(|(_, dim)| SelectExpr::Column {
124                column: dim.column.clone(),
125                alias: None,
126            })
127            .collect();
128    }
129
130    // --- Time interval bucketing: replace raw column with toStartOfInterval ---
131    for ti in time_intervals {
132        let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
133        let alias = dim_agg_key(&ti.graphql_alias);
134        for sel in &mut selects {
135            if let SelectExpr::Column { column, alias: ref mut a } = sel {
136                if column == &ti.column {
137                    *column = interval_expr.clone();
138                    *a = Some(alias.clone());
139                    break;
140                }
141            }
142        }
143    }
144
145    let (filters, agg_having) = split_aggregate_filters(filters);
146    let mut group_by = Vec::new();
147    let mut having = agg_having;
148
149    // --- Aggregation mode: triggered by metrics, dim aggs, or quantiles ---
150    if !metrics.is_empty() || !dim_aggs.is_empty() {
151        let agg_columns: HashSet<String> = dim_aggs.iter()
152            .map(|da| da.value_column.clone())
153            .collect();
154
155        group_by = selects
156            .iter()
157            .filter_map(|s| match s {
158                SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
159                    Some(column.clone())
160                }
161                _ => None,
162            })
163            .collect();
164
165        for da in dim_aggs {
166            selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
167            let alias = dim_agg_key(&da.graphql_alias);
168            let condition = da.condition_filter.as_ref().and_then(|f| {
169                let sql = compile_filter_inline(f);
170                if sql.is_empty() { None } else { Some(sql) }
171            });
172            let func_name = match da.agg_type {
173                DimAggType::ArgMax => "argMax",
174                DimAggType::ArgMin => "argMin",
175            };
176
177            selects.push(SelectExpr::DimAggregate {
178                agg_type: da.agg_type.clone(),
179                value_column: da.value_column.clone(),
180                compare_column: da.compare_column.clone(),
181                alias: alias.clone(),
182                condition,
183            });
184
185            if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
186                let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
187                let h = parse_select_where_from_value(obj, &agg_expr)?;
188                if !h.is_empty() {
189                    having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
190                }
191            }
192        }
193
194        for m in metrics {
195            let dim_col = flat.iter()
196                .find(|(path, _)| path == &m.of_dimension)
197                .map(|(_, dim)| dim.column.clone())
198                .unwrap_or_else(|| "*".to_string());
199            let alias = metric_key(&m.alias);
200            let metric_def = cube.find_metric(&m.function);
201
202            if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
203                let tmpl = md.expression_template.as_ref().unwrap();
204                let expanded = tmpl.replace("{column}", &dim_col);
205                selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
206            } else {
207                let func = m.function.to_uppercase();
208                let condition = m.condition_filter.as_ref().and_then(|f| {
209                    let sql = compile_filter_inline(f);
210                    if sql.is_empty() { None } else { Some(sql) }
211                });
212                selects.push(SelectExpr::Aggregate {
213                    function: func.clone(), column: dim_col.clone(),
214                    alias: alias.clone(), condition,
215                });
216                if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
217                    let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
218                        else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
219                        else { format!("{func}(`{dim_col}`)") };
220                    let h = parse_select_where_from_value(obj, &agg_expr)?;
221                    if !h.is_empty() {
222                        having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
223                    }
224                }
225            }
226        }
227    }
228
229    for q in quantiles {
230        let dim_col = flat.iter()
231            .find(|(path, _)| path == &q.of_dimension)
232            .map(|(_, dim)| dim.column.clone())
233            .unwrap_or_else(|| "*".to_string());
234        let alias = metric_key(&q.alias);
235        let expr = format!("quantile({})(`{}`)", q.level, dim_col);
236        selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
237        if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
238            group_by = selects.iter().filter_map(|s| match s {
239                SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
240                _ => None,
241            }).collect();
242        }
243    }
244
245    // --- Build unified allowed_keys from finalized selects (Bitquery pattern) ---
246    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
247
248    for calc in calculates {
249        let alias = metric_key(&calc.alias);
250        let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
251        selects.push(SelectExpr::Column {
252            column: format!("ifNotFinite(({resolved}), 0)"),
253            alias: Some(alias),
254        });
255    }
256
257    ensure_having_columns_in_selects(&having, &mut selects);
258
259    let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
260    let order_by = parse_order_by(args, cube, &allowed_keys)?;
261    let limit_by = parse_limit_by(args, cube)?;
262
263    let from_subquery = cube.from_subquery.as_ref().map(|s| {
264        s.replace("{schema}", &schema).replace("{chain}", network)
265    });
266
267    Ok(QueryIR {
268        cube: cube.name.clone(),
269        schema,
270        table,
271        selects,
272        filters,
273        having,
274        group_by,
275        order_by,
276        limit,
277        offset,
278        limit_by,
279        use_final: cube.use_final,
280        joins: Vec::new(),
281        custom_query_builder: cube.custom_query_builder.clone(),
282        from_subquery,
283    })
284}
285
286/// Parse a selectWhere value object (from GraphQL Value, not ObjectAccessor)
287/// into a HAVING FilterNode.
288fn parse_select_where_from_value(
289    obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
290    aggregate_expr: &str,
291) -> Result<FilterNode, async_graphql::Error> {
292    let mut conditions = Vec::new();
293
294    for (key, op) in &[
295        ("eq", CompareOp::Eq),
296        ("ne", CompareOp::Ne),
297        ("gt", CompareOp::Gt),
298        ("ge", CompareOp::Ge),
299        ("lt", CompareOp::Lt),
300        ("le", CompareOp::Le),
301    ] {
302        if let Some(val) = obj.get(*key) {
303            let sql_val = match val {
304                async_graphql::Value::String(s) => {
305                    if let Ok(f) = s.parse::<f64>() {
306                        SqlValue::Float(f)
307                    } else {
308                        SqlValue::String(s.clone())
309                    }
310                }
311                async_graphql::Value::Number(n) => {
312                    if let Some(f) = n.as_f64() {
313                        SqlValue::Float(f)
314                    } else {
315                        SqlValue::Int(n.as_i64().unwrap_or(0))
316                    }
317                }
318                _ => continue,
319            };
320            conditions.push(FilterNode::Condition {
321                column: aggregate_expr.to_string(),
322                op: op.clone(),
323                value: sql_val,
324            });
325        }
326    }
327
328    Ok(match conditions.len() {
329        0 => FilterNode::Empty,
330        1 => conditions.into_iter().next().unwrap(),
331        _ => FilterNode::And(conditions),
332    })
333}
334
335fn merge_selector_filters(
336    base: FilterNode,
337    args: &ObjectAccessor,
338    selectors: &[SelectorDef],
339) -> Result<FilterNode, async_graphql::Error> {
340    let mut extra = Vec::new();
341
342    for sel in selectors {
343        if let Ok(val) = args.try_get(&sel.graphql_name) {
344            if let Ok(obj) = val.object() {
345                let leaf_filters =
346                    filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
347                extra.extend(leaf_filters);
348            }
349        }
350    }
351
352    if extra.is_empty() {
353        return Ok(base);
354    }
355    if base.is_empty() {
356        return Ok(if extra.len() == 1 {
357            extra.remove(0)
358        } else {
359            FilterNode::And(extra)
360        });
361    }
362    extra.push(base);
363    Ok(FilterNode::And(extra))
364}
365
366fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
367    if defaults.is_empty() {
368        return user_filters;
369    }
370
371    let mut default_nodes: Vec<FilterNode> = defaults
372        .iter()
373        .map(|(col, val)| {
374            let sql_val = if val == "true" || val == "false" {
375                SqlValue::Bool(val == "true")
376            } else if let Ok(n) = val.parse::<i64>() {
377                SqlValue::Int(n)
378            } else {
379                SqlValue::String(val.clone())
380            };
381            FilterNode::Condition {
382                column: col.clone(),
383                op: CompareOp::Eq,
384                value: sql_val,
385            }
386        })
387        .collect();
388
389    if user_filters.is_empty() {
390        if default_nodes.len() == 1 {
391            return default_nodes.remove(0);
392        }
393        return FilterNode::And(default_nodes);
394    }
395
396    default_nodes.push(user_filters);
397    FilterNode::And(default_nodes)
398}
399
400fn parse_limit(
401    args: &ObjectAccessor,
402    default: u32,
403    max: u32,
404) -> Result<(u32, u32), async_graphql::Error> {
405    let mut limit = default;
406    let mut offset = 0u32;
407
408    if let Ok(limit_val) = args.try_get("limit") {
409        if let Ok(limit_obj) = limit_val.object() {
410            if let Ok(count) = limit_obj.try_get("count") {
411                limit = (count.i64()? as u32).min(max);
412            }
413            if let Ok(off) = limit_obj.try_get("offset") {
414                offset = off.i64()? as u32;
415            }
416        }
417    }
418
419    Ok((limit, offset))
420}
421
422fn parse_order_by(
423    args: &ObjectAccessor,
424    cube: &CubeDefinition,
425    allowed_keys: &HashMap<String, String>,
426) -> Result<Vec<OrderExpr>, async_graphql::Error> {
427    let order_val = match args.try_get("orderBy") {
428        Ok(v) => v,
429        Err(_) => return Ok(Vec::new()),
430    };
431
432    let obj = order_val.object()
433        .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
434    let flat = cube.flat_dimensions();
435
436    if let Ok(field) = obj.try_get("descending") {
437        let path = field.enum_name()
438            .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
439        let column = flat.iter()
440            .find(|(p, _)| p == path)
441            .map(|(_, dim)| dim.column.clone())
442            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
443        return Ok(vec![OrderExpr { column, descending: true }]);
444    }
445
446    if let Ok(field) = obj.try_get("ascending") {
447        let path = field.enum_name()
448            .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
449        let column = flat.iter()
450            .find(|(p, _)| p == path)
451            .map(|(_, dim)| dim.column.clone())
452            .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
453        return Ok(vec![OrderExpr { column, descending: false }]);
454    }
455
456    if let Ok(field_str) = obj.try_get("descendingByField") {
457        let name = field_str.string()
458            .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
459        let column = resolve_field_in_keys(name, allowed_keys)?;
460        return Ok(vec![OrderExpr { column, descending: true }]);
461    }
462
463    if let Ok(field_str) = obj.try_get("ascendingByField") {
464        let name = field_str.string()
465            .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
466        let column = resolve_field_in_keys(name, allowed_keys)?;
467        return Ok(vec![OrderExpr { column, descending: false }]);
468    }
469
470    Ok(vec![])
471}
472
473/// Resolve a field reference against the unified allowed_keys registry.
474/// Tries the name as-is, then with metric/dim_agg prefixes.
475fn resolve_field_in_keys(
476    name: &str,
477    allowed_keys: &HashMap<String, String>,
478) -> Result<String, async_graphql::Error> {
479    if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
480    Err(async_graphql::Error::new(format!(
481        "Can't use '{name}' in sorting/ordering. Field not found in executed query."
482    )))
483}
484
485/// Build unified allowed_keys from finalized selects.
486/// Maps user-facing reference names → SQL column/expression.
487/// Follows Bitquery convention for `descendingByField`:
488///   - `{alias}` for metrics/dim aggs (e.g. "count", "high")
489///   - `{parent}_{alias}_{suffix}` for dim aggs (e.g. "BalanceUpdate_Balance_maximum")
490///   - `{parent}_{alias}` for time intervals (e.g. "Block_Timefield")
491fn collect_select_keys(
492    selects: &[SelectExpr],
493    flat: &[(String, crate::cube::definition::Dimension)],
494    field_aliases: &FieldAliasMap,
495    dim_aggs: &[DimAggRequest],
496    time_intervals: &[TimeIntervalRequest],
497) -> HashMap<String, String> {
498    let mut keys = HashMap::new();
499    for sel in selects {
500        match sel {
501            SelectExpr::Column { column, alias: Some(a) } => {
502                keys.insert(a.clone(), column.clone());
503                if let Some(name) = a.strip_prefix("__da_") {
504                    keys.insert(name.to_string(), column.clone());
505                } else if let Some(name) = a.strip_prefix("__") {
506                    keys.insert(name.to_string(), column.clone());
507                }
508            }
509            SelectExpr::Column { column, alias: None } => {
510                if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
511                    keys.insert(path.clone(), column.clone());
512                }
513                keys.insert(column.clone(), column.clone());
514            }
515            SelectExpr::Aggregate { alias, function, column, .. } => {
516                let expr = format_agg_sql(function, column);
517                keys.insert(alias.clone(), expr.clone());
518                if let Some(name) = alias.strip_prefix("__") {
519                    keys.insert(name.to_string(), expr);
520                }
521            }
522            SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
523                let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
524                keys.insert(alias.clone(), expr.clone());
525                if let Some(name) = alias.strip_prefix("__da_") {
526                    keys.insert(name.to_string(), expr);
527                }
528            }
529        }
530    }
531    // Bitquery convention: dim agg references — both with and without _maximum/_minimum suffix
532    for da in dim_aggs {
533        let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
534        let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
535        // {alias}_{suffix} — e.g. "QuotePostAmount_maximum"
536        keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
537        // {field_path}_{suffix} — e.g. "Pool_Quote_PostAmount_maximum"
538        keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
539        if let Some(i) = da.field_path.rfind('_') {
540            let parent = &da.field_path[..i];
541            // {parent}_{alias}_{suffix} — e.g. "Pool_Quote_QuotePostAmount_maximum"
542            keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
543            // {parent}_{alias} (no suffix) — for calculate $variable refs, e.g. "Trade_CurrentPrice"
544            keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
545        }
546    }
547    // Bitquery convention: time interval → {parent}_{alias}
548    for ti in time_intervals {
549        let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
550        if let Some(i) = ti.field_path.rfind('_') {
551            let parent = &ti.field_path[..i];
552            keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
553        }
554    }
555    for (alias_path, column) in field_aliases {
556        keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
557    }
558    keys
559}
560
561fn format_agg_sql(function: &str, column: &str) -> String {
562    let func = function.to_uppercase();
563    let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
564    match (func.as_str(), column) {
565        ("COUNT", "*") => "count()".to_string(),
566        ("UNIQ", _) => format!("uniq({qcol})"),
567        (f, _) => format!("{}({qcol})", f.to_lowercase()),
568    }
569}
570
571fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
572    let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
573    let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
574    let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
575    format!("{func}({qval}, {qcmp})")
576}
577
578/// Resolve `$variable` references in a calculate expression using allowed_keys.
579fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
580    let mut result = String::new();
581    let mut chars = expression.chars().peekable();
582    while let Some(ch) = chars.next() {
583        if ch == '$' {
584            let mut var_name = String::new();
585            while let Some(&c) = chars.peek() {
586                if c.is_alphanumeric() || c == '_' {
587                    var_name.push(c);
588                    chars.next();
589                } else {
590                    break;
591                }
592            }
593            if !var_name.is_empty() {
594                if let Some(resolved) = allowed_keys.get(&var_name) {
595                    let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
596                    result.push_str(&format!("toFloat64({col_ref})"));
597                } else {
598                    result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
599                }
600            } else {
601                result.push('$');
602            }
603        } else {
604            result.push(ch);
605        }
606    }
607    result
608}
609
610fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
611    let unit_sql = match unit {
612        "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
613        "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
614    };
615    format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
616}
617
618/// Compile a FilterNode into an inline SQL fragment (no parameterized bindings).
619/// Used for embedding conditions inside aggregate functions (countIf, sumIf).
620fn compile_filter_inline(node: &FilterNode) -> String {
621    match node {
622        FilterNode::Empty => String::new(),
623        FilterNode::Condition { column, op, value } => {
624            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
625            if op.is_unary() {
626                return format!("{col} {}", op.sql_op());
627            }
628            let val_str = match value {
629                SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
630                SqlValue::Int(i) => i.to_string(),
631                SqlValue::Float(f) => f.to_string(),
632                SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
633                SqlValue::Expression(e) => e.clone(),
634            };
635            match op {
636                CompareOp::In | CompareOp::NotIn => {
637                    if let SqlValue::String(csv) = value {
638                        let items: Vec<String> = csv.split(',')
639                            .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
640                            .collect();
641                        format!("{col} {} ({})", op.sql_op(), items.join(", "))
642                    } else {
643                        format!("{col} {} ({val_str})", op.sql_op())
644                    }
645                }
646                CompareOp::Includes => {
647                    let like_val = match value {
648                        SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
649                        _ => val_str,
650                    };
651                    format!("{col} LIKE {like_val}")
652                }
653                _ => format!("{col} {} {val_str}", op.sql_op()),
654            }
655        }
656        FilterNode::And(children) => {
657            let parts: Vec<String> = children.iter()
658                .map(compile_filter_inline)
659                .filter(|s| !s.is_empty())
660                .collect();
661            match parts.len() {
662                0 => String::new(),
663                1 => parts.into_iter().next().unwrap(),
664                _ => format!("({})", parts.join(" AND ")),
665            }
666        }
667        FilterNode::Or(children) => {
668            let parts: Vec<String> = children.iter()
669                .map(compile_filter_inline)
670                .filter(|s| !s.is_empty())
671                .collect();
672            match parts.len() {
673                0 => String::new(),
674                1 => parts.into_iter().next().unwrap(),
675                _ => format!("({})", parts.join(" OR ")),
676            }
677        }
678        FilterNode::ArrayIncludes { .. } => {
679            // ArrayIncludes is compiled by the SQL dialect, not inline.
680            // In conditional aggregation context, this is not expected.
681            String::new()
682        }
683    }
684}
685
686/// Walk a HAVING FilterNode and append any referenced aggregate columns that
687/// are missing from `selects`. The SQL dialect will assign aliases later.
688fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
689    let cols = collect_having_columns(having);
690    for col in cols {
691        if !col.contains('(') {
692            continue;
693        }
694        let already_present = selects.iter().any(|s| match s {
695            SelectExpr::Column { column, .. } => column == &col,
696            _ => false,
697        });
698        if !already_present {
699            selects.push(SelectExpr::Column {
700                column: col,
701                alias: None,
702            });
703        }
704    }
705}
706
707fn collect_having_columns(node: &FilterNode) -> Vec<String> {
708    match node {
709        FilterNode::Empty => vec![],
710        FilterNode::Condition { column, .. } => vec![column.clone()],
711        FilterNode::And(children) | FilterNode::Or(children) => {
712            children.iter().flat_map(collect_having_columns).collect()
713        }
714        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
715    }
716}
717
718/// Detect if a column expression is an aggregate function call.
719/// Matches patterns like `argMaxMerge(...)`, `countMerge(...)`, `sumMerge(...)`, etc.
720fn is_aggregate_column(column: &str) -> bool {
721    column.contains('(')
722}
723
724/// Walk a FilterNode tree and split it into (where_part, having_part).
725/// Leaf conditions on aggregate columns go to HAVING; everything else stays in WHERE.
726fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
727    match node {
728        FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
729        FilterNode::Condition { ref column, .. } => {
730            if is_aggregate_column(column) {
731                (FilterNode::Empty, node)
732            } else {
733                (node, FilterNode::Empty)
734            }
735        }
736        FilterNode::And(children) => {
737            let mut where_parts = Vec::new();
738            let mut having_parts = Vec::new();
739            for child in children {
740                let (w, h) = split_aggregate_filters(child);
741                if !w.is_empty() { where_parts.push(w); }
742                if !h.is_empty() { having_parts.push(h); }
743            }
744            let where_node = match where_parts.len() {
745                0 => FilterNode::Empty,
746                1 => where_parts.into_iter().next().unwrap(),
747                _ => FilterNode::And(where_parts),
748            };
749            let having_node = match having_parts.len() {
750                0 => FilterNode::Empty,
751                1 => having_parts.into_iter().next().unwrap(),
752                _ => FilterNode::And(having_parts),
753            };
754            (where_node, having_node)
755        }
756        FilterNode::Or(children) => {
757            let any_aggregate = children.iter().any(filter_has_aggregate);
758            if any_aggregate {
759                (FilterNode::Empty, FilterNode::Or(children))
760            } else {
761                (FilterNode::Or(children), FilterNode::Empty)
762            }
763        }
764        FilterNode::ArrayIncludes { .. } => {
765            // ArrayIncludes always goes to WHERE, never HAVING
766            (node, FilterNode::Empty)
767        }
768    }
769}
770
771fn filter_has_aggregate(node: &FilterNode) -> bool {
772    match node {
773        FilterNode::Empty => false,
774        FilterNode::Condition { column, .. } => is_aggregate_column(column),
775        FilterNode::And(children) | FilterNode::Or(children) => {
776            children.iter().any(filter_has_aggregate)
777        }
778        FilterNode::ArrayIncludes { .. } => false,
779    }
780}
781
782fn parse_limit_by(
783    args: &ObjectAccessor,
784    cube: &CubeDefinition,
785) -> Result<Option<LimitByExpr>, async_graphql::Error> {
786    let lb_val = match args.try_get("limitBy") {
787        Ok(v) => v,
788        Err(_) => return Ok(None),
789    };
790    let lb_obj = lb_val.object()?;
791    let count = lb_obj.try_get("count")?.i64()? as u32;
792    let offset = lb_obj
793        .try_get("offset")
794        .ok()
795        .and_then(|v| v.i64().ok())
796        .unwrap_or(0) as u32;
797    let by_val = lb_obj.try_get("by")?;
798    let by_str = by_val.enum_name()?;
799
800    let flat = cube.flat_dimensions();
801    let column = flat.iter()
802        .find(|(path, _)| path == by_str)
803        .map(|(_, dim)| dim.column.clone())
804        .ok_or_else(|| async_graphql::Error::new(
805            format!("Unknown limitBy field: {by_str}")
806        ))?;
807
808    Ok(Some(LimitByExpr { count, offset, columns: vec![column] }))
809}