Skip to main content

activecube_rs/compiler/
parser.rs

1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9/// Describes a metric requested in the GraphQL selection set.
10pub struct MetricRequest {
11    pub function: String,
12    pub of_dimension: String,
13    /// The raw selectWhere value extracted from GraphQL arguments.
14    pub select_where_value: Option<async_graphql::Value>,
15    /// Pre-parsed condition filter for conditional aggregation (countIf/sumIf).
16    pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20    cube: &CubeDefinition,
21    network: &str,
22    args: &ObjectAccessor,
23    metrics: &[MetricRequest],
24    requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26    let flat = cube.flat_dimensions();
27    let requested_cols: Vec<String> = flat.iter()
28        .filter(|(path, _)| {
29            requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
30        })
31        .map(|(_, dim)| dim.column.clone())
32        .collect();
33    let (schema, table) = cube.resolve_table(network, &requested_cols);
34
35    let filters = if let Ok(where_val) = args.try_get("where") {
36        if let Ok(where_obj) = where_val.object() {
37            filter::parse_where(&where_obj, &cube.dimensions)?
38        } else {
39            FilterNode::Empty
40        }
41    } else {
42        FilterNode::Empty
43    };
44
45    let filters = merge_selector_filters(filters, args, &cube.selectors)?;
46    // For tables that use a chain column instead of chain-prefixed table names,
47    // inject a WHERE chain = ? filter automatically.
48    let filters = if let Some(ref chain_col) = cube.chain_column {
49        let chain_filter = FilterNode::Condition {
50            column: chain_col.clone(),
51            op: CompareOp::Eq,
52            value: SqlValue::String(network.to_string()),
53        };
54        if filters.is_empty() {
55            chain_filter
56        } else {
57            FilterNode::And(vec![chain_filter, filters])
58        }
59    } else {
60        filters
61    };
62    let filters = apply_default_filters(filters, &cube.default_filters);
63    let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
64    let order_by = parse_order_by(args, cube)?;
65
66    let mut selects: Vec<SelectExpr> = flat
67        .iter()
68        .filter(|(path, _)| {
69            requested_fields
70                .as_ref()
71                .is_none_or(|rf| rf.contains(path))
72        })
73        .map(|(_, dim)| SelectExpr::Column {
74            column: dim.column.clone(),
75            alias: None,
76        })
77        .collect();
78
79    // When only metrics are requested with no dimension fields, keep selects empty
80    // so GROUP BY is also empty → produces a single aggregated row (e.g. total count).
81    // Only fall back to all dimensions when there are NO metrics either (pure wildcard).
82    if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
83        selects = flat
84            .iter()
85            .map(|(_, dim)| SelectExpr::Column {
86                column: dim.column.clone(),
87                alias: None,
88            })
89            .collect();
90    }
91
92    // Split WHERE filters: conditions on aggregate columns (containing function
93    // calls like argMaxMerge, countMerge, etc.) must go into HAVING, not WHERE.
94    let (filters, agg_having) = split_aggregate_filters(filters);
95
96    let mut group_by = Vec::new();
97    let mut having = agg_having;
98
99    if !metrics.is_empty() {
100        group_by = selects
101            .iter()
102            .filter_map(|s| match s {
103                SelectExpr::Column { column, .. } => Some(column.clone()),
104                _ => None,
105            })
106            .collect();
107
108        for m in metrics {
109            let dim_col = flat
110                .iter()
111                .find(|(path, _)| path == &m.of_dimension)
112                .map(|(_, dim)| dim.column.clone())
113                .unwrap_or_else(|| "*".to_string());
114
115            let alias = format!("__{}", m.function);
116            let metric_def = cube.find_metric(&m.function);
117
118            if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
119                let tmpl = md.expression_template.as_ref().unwrap();
120                let expanded = tmpl.replace("{column}", &dim_col);
121                selects.push(SelectExpr::Column {
122                    column: expanded,
123                    alias: Some(alias),
124                });
125            } else {
126                let func = m.function.to_uppercase();
127                let condition = m.condition_filter.as_ref().and_then(|f| {
128                    let sql = compile_filter_inline(f);
129                    if sql.is_empty() { None } else { Some(sql) }
130                });
131
132                selects.push(SelectExpr::Aggregate {
133                    function: func.clone(),
134                    column: dim_col.clone(),
135                    alias: alias.clone(),
136                    condition,
137                });
138
139                if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
140                    let agg_expr = if func == "COUNT" && dim_col == "*" {
141                        "COUNT(*)".to_string()
142                    } else if func == "UNIQ" {
143                        format!("COUNT(DISTINCT `{dim_col}`)")
144                    } else {
145                        format!("{func}(`{dim_col}`)")
146                    };
147
148                    let h = parse_select_where_from_value(obj, &agg_expr)?;
149                    if !h.is_empty() {
150                        having = if having.is_empty() {
151                            h
152                        } else {
153                            FilterNode::And(vec![having, h])
154                        };
155                    }
156                }
157            }
158        }
159    }
160
161    // Ensure all aggregate columns referenced in HAVING are present in selects.
162    // Without this, the SQL dialect cannot alias them for ClickHouse HAVING scope.
163    ensure_having_columns_in_selects(&having, &mut selects);
164
165    let limit_by = parse_limit_by(args, cube)?;
166
167    let from_subquery = cube.from_subquery.as_ref().map(|s| {
168        s.replace("{schema}", &schema).replace("{chain}", network)
169    });
170
171    Ok(QueryIR {
172        cube: cube.name.clone(),
173        schema,
174        table,
175        selects,
176        filters,
177        having,
178        group_by,
179        order_by,
180        limit,
181        offset,
182        limit_by,
183        use_final: cube.use_final,
184        joins: Vec::new(),
185        custom_query_builder: cube.custom_query_builder.clone(),
186        from_subquery,
187    })
188}
189
190/// Parse a selectWhere value object (from GraphQL Value, not ObjectAccessor)
191/// into a HAVING FilterNode.
192fn parse_select_where_from_value(
193    obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
194    aggregate_expr: &str,
195) -> Result<FilterNode, async_graphql::Error> {
196    let mut conditions = Vec::new();
197
198    for (key, op) in &[
199        ("eq", CompareOp::Eq),
200        ("gt", CompareOp::Gt),
201        ("ge", CompareOp::Ge),
202        ("lt", CompareOp::Lt),
203        ("le", CompareOp::Le),
204    ] {
205        if let Some(val) = obj.get(*key) {
206            let sql_val = match val {
207                async_graphql::Value::String(s) => {
208                    if let Ok(f) = s.parse::<f64>() {
209                        SqlValue::Float(f)
210                    } else {
211                        SqlValue::String(s.clone())
212                    }
213                }
214                async_graphql::Value::Number(n) => {
215                    if let Some(f) = n.as_f64() {
216                        SqlValue::Float(f)
217                    } else {
218                        SqlValue::Int(n.as_i64().unwrap_or(0))
219                    }
220                }
221                _ => continue,
222            };
223            conditions.push(FilterNode::Condition {
224                column: aggregate_expr.to_string(),
225                op: op.clone(),
226                value: sql_val,
227            });
228        }
229    }
230
231    Ok(match conditions.len() {
232        0 => FilterNode::Empty,
233        1 => conditions.into_iter().next().unwrap(),
234        _ => FilterNode::And(conditions),
235    })
236}
237
238fn merge_selector_filters(
239    base: FilterNode,
240    args: &ObjectAccessor,
241    selectors: &[SelectorDef],
242) -> Result<FilterNode, async_graphql::Error> {
243    let mut extra = Vec::new();
244
245    for sel in selectors {
246        if let Ok(val) = args.try_get(&sel.graphql_name) {
247            if let Ok(obj) = val.object() {
248                let leaf_filters =
249                    filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
250                extra.extend(leaf_filters);
251            }
252        }
253    }
254
255    if extra.is_empty() {
256        return Ok(base);
257    }
258    if base.is_empty() {
259        return Ok(if extra.len() == 1 {
260            extra.remove(0)
261        } else {
262            FilterNode::And(extra)
263        });
264    }
265    extra.push(base);
266    Ok(FilterNode::And(extra))
267}
268
269fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
270    if defaults.is_empty() {
271        return user_filters;
272    }
273
274    let mut default_nodes: Vec<FilterNode> = defaults
275        .iter()
276        .map(|(col, val)| {
277            let sql_val = if val == "true" || val == "false" {
278                SqlValue::Bool(val == "true")
279            } else if let Ok(n) = val.parse::<i64>() {
280                SqlValue::Int(n)
281            } else {
282                SqlValue::String(val.clone())
283            };
284            FilterNode::Condition {
285                column: col.clone(),
286                op: CompareOp::Eq,
287                value: sql_val,
288            }
289        })
290        .collect();
291
292    if user_filters.is_empty() {
293        if default_nodes.len() == 1 {
294            return default_nodes.remove(0);
295        }
296        return FilterNode::And(default_nodes);
297    }
298
299    default_nodes.push(user_filters);
300    FilterNode::And(default_nodes)
301}
302
303fn parse_limit(
304    args: &ObjectAccessor,
305    default: u32,
306    max: u32,
307) -> Result<(u32, u32), async_graphql::Error> {
308    let mut limit = default;
309    let mut offset = 0u32;
310
311    if let Ok(limit_val) = args.try_get("limit") {
312        if let Ok(limit_obj) = limit_val.object() {
313            if let Ok(count) = limit_obj.try_get("count") {
314                limit = (count.i64()? as u32).min(max);
315            }
316            if let Ok(off) = limit_obj.try_get("offset") {
317                offset = off.i64()? as u32;
318            }
319        }
320    }
321
322    Ok((limit, offset))
323}
324
325fn parse_order_by(
326    args: &ObjectAccessor,
327    cube: &CubeDefinition,
328) -> Result<Vec<OrderExpr>, async_graphql::Error> {
329    let flat = cube.flat_dimensions();
330
331    if let Ok(list_val) = args.try_get("orderByList") {
332        if let Ok(list) = list_val.list() {
333            let mut orders = Vec::new();
334            for item in list.iter() {
335                let obj = item.object()
336                    .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
337                let field_accessor = obj.try_get("field")
338                    .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
339                let field_str = field_accessor.enum_name()
340                    .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
341                let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
342                    dir_accessor.enum_name() == Ok("DESC")
343                } else {
344                    false
345                };
346                let column = flat.iter()
347                    .find(|(p, _)| p == field_str)
348                    .map(|(_, dim)| dim.column.clone())
349                    .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
350                orders.push(OrderExpr { column, descending });
351            }
352            if !orders.is_empty() {
353                return Ok(orders);
354            }
355        }
356    }
357
358    let order_val = match args.try_get("orderBy") {
359        Ok(v) => v,
360        Err(_) => return Ok(Vec::new()),
361    };
362
363    let enum_str = order_val
364        .enum_name()
365        .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
366
367    let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
368        (true, path)
369    } else if let Some(path) = enum_str.strip_suffix("_ASC") {
370        (false, path)
371    } else {
372        return Err(async_graphql::Error::new(format!(
373            "Invalid orderBy value: {enum_str}"
374        )));
375    };
376
377    let column = flat
378        .iter()
379        .find(|(p, _)| p == field_path)
380        .map(|(_, dim)| dim.column.clone())
381        .ok_or_else(|| {
382            async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
383        })?;
384
385    Ok(vec![OrderExpr { column, descending }])
386}
387
388/// Compile a FilterNode into an inline SQL fragment (no parameterized bindings).
389/// Used for embedding conditions inside aggregate functions (countIf, sumIf).
390fn compile_filter_inline(node: &FilterNode) -> String {
391    match node {
392        FilterNode::Empty => String::new(),
393        FilterNode::Condition { column, op, value } => {
394            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
395            if op.is_unary() {
396                return format!("{col} {}", op.sql_op());
397            }
398            let val_str = match value {
399                SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
400                SqlValue::Int(i) => i.to_string(),
401                SqlValue::Float(f) => f.to_string(),
402                SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
403            };
404            match op {
405                CompareOp::In | CompareOp::NotIn => {
406                    if let SqlValue::String(csv) = value {
407                        let items: Vec<String> = csv.split(',')
408                            .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
409                            .collect();
410                        format!("{col} {} ({})", op.sql_op(), items.join(", "))
411                    } else {
412                        format!("{col} {} ({val_str})", op.sql_op())
413                    }
414                }
415                CompareOp::Includes => {
416                    let like_val = match value {
417                        SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
418                        _ => val_str,
419                    };
420                    format!("{col} LIKE {like_val}")
421                }
422                _ => format!("{col} {} {val_str}", op.sql_op()),
423            }
424        }
425        FilterNode::And(children) => {
426            let parts: Vec<String> = children.iter()
427                .map(compile_filter_inline)
428                .filter(|s| !s.is_empty())
429                .collect();
430            match parts.len() {
431                0 => String::new(),
432                1 => parts.into_iter().next().unwrap(),
433                _ => format!("({})", parts.join(" AND ")),
434            }
435        }
436        FilterNode::Or(children) => {
437            let parts: Vec<String> = children.iter()
438                .map(compile_filter_inline)
439                .filter(|s| !s.is_empty())
440                .collect();
441            match parts.len() {
442                0 => String::new(),
443                1 => parts.into_iter().next().unwrap(),
444                _ => format!("({})", parts.join(" OR ")),
445            }
446        }
447    }
448}
449
450/// Walk a HAVING FilterNode and append any referenced aggregate columns that
451/// are missing from `selects`. The SQL dialect will assign aliases later.
452fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
453    let cols = collect_having_columns(having);
454    for col in cols {
455        if !col.contains('(') {
456            continue;
457        }
458        let already_present = selects.iter().any(|s| match s {
459            SelectExpr::Column { column, .. } => column == &col,
460            _ => false,
461        });
462        if !already_present {
463            selects.push(SelectExpr::Column {
464                column: col,
465                alias: None,
466            });
467        }
468    }
469}
470
471fn collect_having_columns(node: &FilterNode) -> Vec<String> {
472    match node {
473        FilterNode::Empty => vec![],
474        FilterNode::Condition { column, .. } => vec![column.clone()],
475        FilterNode::And(children) | FilterNode::Or(children) => {
476            children.iter().flat_map(collect_having_columns).collect()
477        }
478    }
479}
480
481/// Detect if a column expression is an aggregate function call.
482/// Matches patterns like `argMaxMerge(...)`, `countMerge(...)`, `sumMerge(...)`, etc.
483fn is_aggregate_column(column: &str) -> bool {
484    column.contains('(')
485}
486
487/// Walk a FilterNode tree and split it into (where_part, having_part).
488/// Leaf conditions on aggregate columns go to HAVING; everything else stays in WHERE.
489fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
490    match node {
491        FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
492        FilterNode::Condition { ref column, .. } => {
493            if is_aggregate_column(column) {
494                (FilterNode::Empty, node)
495            } else {
496                (node, FilterNode::Empty)
497            }
498        }
499        FilterNode::And(children) => {
500            let mut where_parts = Vec::new();
501            let mut having_parts = Vec::new();
502            for child in children {
503                let (w, h) = split_aggregate_filters(child);
504                if !w.is_empty() { where_parts.push(w); }
505                if !h.is_empty() { having_parts.push(h); }
506            }
507            let where_node = match where_parts.len() {
508                0 => FilterNode::Empty,
509                1 => where_parts.into_iter().next().unwrap(),
510                _ => FilterNode::And(where_parts),
511            };
512            let having_node = match having_parts.len() {
513                0 => FilterNode::Empty,
514                1 => having_parts.into_iter().next().unwrap(),
515                _ => FilterNode::And(having_parts),
516            };
517            (where_node, having_node)
518        }
519        FilterNode::Or(children) => {
520            let any_aggregate = children.iter().any(filter_has_aggregate);
521            if any_aggregate {
522                (FilterNode::Empty, FilterNode::Or(children))
523            } else {
524                (FilterNode::Or(children), FilterNode::Empty)
525            }
526        }
527    }
528}
529
530fn filter_has_aggregate(node: &FilterNode) -> bool {
531    match node {
532        FilterNode::Empty => false,
533        FilterNode::Condition { column, .. } => is_aggregate_column(column),
534        FilterNode::And(children) | FilterNode::Or(children) => {
535            children.iter().any(filter_has_aggregate)
536        }
537    }
538}
539
540fn parse_limit_by(
541    args: &ObjectAccessor,
542    cube: &CubeDefinition,
543) -> Result<Option<LimitByExpr>, async_graphql::Error> {
544    let lb_val = match args.try_get("limitBy") {
545        Ok(v) => v,
546        Err(_) => return Ok(None),
547    };
548    let lb_obj = lb_val.object()?;
549    let count = lb_obj.try_get("count")?.i64()? as u32;
550    let offset = lb_obj
551        .try_get("offset")
552        .ok()
553        .and_then(|v| v.i64().ok())
554        .unwrap_or(0) as u32;
555    let by_str = lb_obj.try_get("by")?.string()?;
556
557    let flat = cube.flat_dimensions();
558    let columns: Vec<String> = by_str
559        .split(',')
560        .map(|s| {
561            let trimmed = s.trim();
562            flat.iter()
563                .find(|(path, _)| path == trimmed)
564                .map(|(_, dim)| dim.column.clone())
565                .unwrap_or_else(|| trimmed.to_string())
566        })
567        .collect();
568
569    if columns.is_empty() {
570        return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
571    }
572
573    Ok(Some(LimitByExpr { count, offset, columns }))
574}