Skip to main content

activecube_rs/compiler/
parser.rs

1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9/// Describes a metric requested in the GraphQL selection set.
10pub struct MetricRequest {
11    pub function: String,
12    pub of_dimension: String,
13    /// The raw selectWhere value extracted from GraphQL arguments.
14    pub select_where_value: Option<async_graphql::Value>,
15    /// Pre-parsed condition filter for conditional aggregation (countIf/sumIf).
16    pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20    cube: &CubeDefinition,
21    network: &str,
22    args: &ObjectAccessor,
23    metrics: &[MetricRequest],
24    requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26    let table = cube.table_for_chain(network);
27
28    let filters = if let Ok(where_val) = args.try_get("where") {
29        if let Ok(where_obj) = where_val.object() {
30            filter::parse_where(&where_obj, &cube.dimensions)?
31        } else {
32            FilterNode::Empty
33        }
34    } else {
35        FilterNode::Empty
36    };
37
38    let filters = merge_selector_filters(filters, args, &cube.selectors)?;
39    // For tables that use a chain column instead of chain-prefixed table names,
40    // inject a WHERE chain = ? filter automatically.
41    let filters = if let Some(ref chain_col) = cube.chain_column {
42        let chain_filter = FilterNode::Condition {
43            column: chain_col.clone(),
44            op: CompareOp::Eq,
45            value: SqlValue::String(network.to_string()),
46        };
47        if filters.is_empty() {
48            chain_filter
49        } else {
50            FilterNode::And(vec![chain_filter, filters])
51        }
52    } else {
53        filters
54    };
55    let filters = apply_default_filters(filters, &cube.default_filters);
56    let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
57    let order_by = parse_order_by(args, cube)?;
58
59    let flat = cube.flat_dimensions();
60    let mut selects: Vec<SelectExpr> = flat
61        .iter()
62        .filter(|(path, _)| {
63            requested_fields
64                .as_ref()
65                .is_none_or(|rf| rf.contains(path))
66        })
67        .map(|(_, dim)| SelectExpr::Column {
68            column: dim.column.clone(),
69            alias: None,
70        })
71        .collect();
72
73    // When only metrics are requested with no dimension fields, keep selects empty
74    // so GROUP BY is also empty → produces a single aggregated row (e.g. total count).
75    // Only fall back to all dimensions when there are NO metrics either (pure wildcard).
76    if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
77        selects = flat
78            .iter()
79            .map(|(_, dim)| SelectExpr::Column {
80                column: dim.column.clone(),
81                alias: None,
82            })
83            .collect();
84    }
85
86    // Split WHERE filters: conditions on aggregate columns (containing function
87    // calls like argMaxMerge, countMerge, etc.) must go into HAVING, not WHERE.
88    let (filters, agg_having) = split_aggregate_filters(filters);
89
90    let mut group_by = Vec::new();
91    let mut having = agg_having;
92
93    if !metrics.is_empty() {
94        group_by = selects
95            .iter()
96            .filter_map(|s| match s {
97                SelectExpr::Column { column, .. } => Some(column.clone()),
98                _ => None,
99            })
100            .collect();
101
102        for m in metrics {
103            let dim_col = flat
104                .iter()
105                .find(|(path, _)| path == &m.of_dimension)
106                .map(|(_, dim)| dim.column.clone())
107                .unwrap_or_else(|| "*".to_string());
108
109            let func = m.function.to_uppercase();
110            let alias = format!("__{}", m.function);
111
112            let condition = m.condition_filter.as_ref().and_then(|f| {
113                let sql = compile_filter_inline(f);
114                if sql.is_empty() { None } else { Some(sql) }
115            });
116
117            selects.push(SelectExpr::Aggregate {
118                function: func.clone(),
119                column: dim_col.clone(),
120                alias,
121                condition,
122            });
123
124            if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
125                let agg_expr = if func == "COUNT" && dim_col == "*" {
126                    "COUNT(*)".to_string()
127                } else if func == "UNIQ" {
128                    format!("COUNT(DISTINCT `{dim_col}`)")
129                } else {
130                    format!("{func}(`{dim_col}`)")
131                };
132
133                let h = parse_select_where_from_value(obj, &agg_expr)?;
134                if !h.is_empty() {
135                    having = if having.is_empty() {
136                        h
137                    } else {
138                        FilterNode::And(vec![having, h])
139                    };
140                }
141            }
142        }
143    }
144
145    // Ensure all aggregate columns referenced in HAVING are present in selects.
146    // Without this, the SQL dialect cannot alias them for ClickHouse HAVING scope.
147    ensure_having_columns_in_selects(&having, &mut selects);
148
149    let limit_by = parse_limit_by(args, cube)?;
150
151    Ok(QueryIR {
152        cube: cube.name.clone(),
153        schema: cube.schema.clone(),
154        table,
155        selects,
156        filters,
157        having,
158        group_by,
159        order_by,
160        limit,
161        offset,
162        limit_by,
163        use_final: cube.use_final,
164        joins: Vec::new(),
165    })
166}
167
168/// Parse a selectWhere value object (from GraphQL Value, not ObjectAccessor)
169/// into a HAVING FilterNode.
170fn parse_select_where_from_value(
171    obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
172    aggregate_expr: &str,
173) -> Result<FilterNode, async_graphql::Error> {
174    let mut conditions = Vec::new();
175
176    for (key, op) in &[
177        ("eq", CompareOp::Eq),
178        ("gt", CompareOp::Gt),
179        ("ge", CompareOp::Ge),
180        ("lt", CompareOp::Lt),
181        ("le", CompareOp::Le),
182    ] {
183        if let Some(val) = obj.get(*key) {
184            let sql_val = match val {
185                async_graphql::Value::String(s) => {
186                    if let Ok(f) = s.parse::<f64>() {
187                        SqlValue::Float(f)
188                    } else {
189                        SqlValue::String(s.clone())
190                    }
191                }
192                async_graphql::Value::Number(n) => {
193                    if let Some(f) = n.as_f64() {
194                        SqlValue::Float(f)
195                    } else {
196                        SqlValue::Int(n.as_i64().unwrap_or(0))
197                    }
198                }
199                _ => continue,
200            };
201            conditions.push(FilterNode::Condition {
202                column: aggregate_expr.to_string(),
203                op: op.clone(),
204                value: sql_val,
205            });
206        }
207    }
208
209    Ok(match conditions.len() {
210        0 => FilterNode::Empty,
211        1 => conditions.into_iter().next().unwrap(),
212        _ => FilterNode::And(conditions),
213    })
214}
215
216fn merge_selector_filters(
217    base: FilterNode,
218    args: &ObjectAccessor,
219    selectors: &[SelectorDef],
220) -> Result<FilterNode, async_graphql::Error> {
221    let mut extra = Vec::new();
222
223    for sel in selectors {
224        if let Ok(val) = args.try_get(&sel.graphql_name) {
225            if let Ok(obj) = val.object() {
226                let leaf_filters =
227                    filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
228                extra.extend(leaf_filters);
229            }
230        }
231    }
232
233    if extra.is_empty() {
234        return Ok(base);
235    }
236    if base.is_empty() {
237        return Ok(if extra.len() == 1 {
238            extra.remove(0)
239        } else {
240            FilterNode::And(extra)
241        });
242    }
243    extra.push(base);
244    Ok(FilterNode::And(extra))
245}
246
247fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
248    if defaults.is_empty() {
249        return user_filters;
250    }
251
252    let mut default_nodes: Vec<FilterNode> = defaults
253        .iter()
254        .map(|(col, val)| {
255            let sql_val = if val == "true" || val == "false" {
256                SqlValue::Bool(val == "true")
257            } else if let Ok(n) = val.parse::<i64>() {
258                SqlValue::Int(n)
259            } else {
260                SqlValue::String(val.clone())
261            };
262            FilterNode::Condition {
263                column: col.clone(),
264                op: CompareOp::Eq,
265                value: sql_val,
266            }
267        })
268        .collect();
269
270    if user_filters.is_empty() {
271        if default_nodes.len() == 1 {
272            return default_nodes.remove(0);
273        }
274        return FilterNode::And(default_nodes);
275    }
276
277    default_nodes.push(user_filters);
278    FilterNode::And(default_nodes)
279}
280
281fn parse_limit(
282    args: &ObjectAccessor,
283    default: u32,
284    max: u32,
285) -> Result<(u32, u32), async_graphql::Error> {
286    let mut limit = default;
287    let mut offset = 0u32;
288
289    if let Ok(limit_val) = args.try_get("limit") {
290        if let Ok(limit_obj) = limit_val.object() {
291            if let Ok(count) = limit_obj.try_get("count") {
292                limit = (count.i64()? as u32).min(max);
293            }
294            if let Ok(off) = limit_obj.try_get("offset") {
295                offset = off.i64()? as u32;
296            }
297        }
298    }
299
300    Ok((limit, offset))
301}
302
303fn parse_order_by(
304    args: &ObjectAccessor,
305    cube: &CubeDefinition,
306) -> Result<Vec<OrderExpr>, async_graphql::Error> {
307    let flat = cube.flat_dimensions();
308
309    if let Ok(list_val) = args.try_get("orderByList") {
310        if let Ok(list) = list_val.list() {
311            let mut orders = Vec::new();
312            for item in list.iter() {
313                let obj = item.object()
314                    .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
315                let field_accessor = obj.try_get("field")
316                    .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
317                let field_str = field_accessor.enum_name()
318                    .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
319                let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
320                    dir_accessor.enum_name() == Ok("DESC")
321                } else {
322                    false
323                };
324                let column = flat.iter()
325                    .find(|(p, _)| p == field_str)
326                    .map(|(_, dim)| dim.column.clone())
327                    .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
328                orders.push(OrderExpr { column, descending });
329            }
330            if !orders.is_empty() {
331                return Ok(orders);
332            }
333        }
334    }
335
336    let order_val = match args.try_get("orderBy") {
337        Ok(v) => v,
338        Err(_) => return Ok(Vec::new()),
339    };
340
341    let enum_str = order_val
342        .enum_name()
343        .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
344
345    let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
346        (true, path)
347    } else if let Some(path) = enum_str.strip_suffix("_ASC") {
348        (false, path)
349    } else {
350        return Err(async_graphql::Error::new(format!(
351            "Invalid orderBy value: {enum_str}"
352        )));
353    };
354
355    let column = flat
356        .iter()
357        .find(|(p, _)| p == field_path)
358        .map(|(_, dim)| dim.column.clone())
359        .ok_or_else(|| {
360            async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
361        })?;
362
363    Ok(vec![OrderExpr { column, descending }])
364}
365
366/// Compile a FilterNode into an inline SQL fragment (no parameterized bindings).
367/// Used for embedding conditions inside aggregate functions (countIf, sumIf).
368fn compile_filter_inline(node: &FilterNode) -> String {
369    match node {
370        FilterNode::Empty => String::new(),
371        FilterNode::Condition { column, op, value } => {
372            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
373            if op.is_unary() {
374                return format!("{col} {}", op.sql_op());
375            }
376            let val_str = match value {
377                SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
378                SqlValue::Int(i) => i.to_string(),
379                SqlValue::Float(f) => f.to_string(),
380                SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
381            };
382            match op {
383                CompareOp::In | CompareOp::NotIn => {
384                    if let SqlValue::String(csv) = value {
385                        let items: Vec<String> = csv.split(',')
386                            .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
387                            .collect();
388                        format!("{col} {} ({})", op.sql_op(), items.join(", "))
389                    } else {
390                        format!("{col} {} ({val_str})", op.sql_op())
391                    }
392                }
393                CompareOp::Includes => {
394                    let like_val = match value {
395                        SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
396                        _ => val_str,
397                    };
398                    format!("{col} LIKE {like_val}")
399                }
400                _ => format!("{col} {} {val_str}", op.sql_op()),
401            }
402        }
403        FilterNode::And(children) => {
404            let parts: Vec<String> = children.iter()
405                .map(compile_filter_inline)
406                .filter(|s| !s.is_empty())
407                .collect();
408            match parts.len() {
409                0 => String::new(),
410                1 => parts.into_iter().next().unwrap(),
411                _ => format!("({})", parts.join(" AND ")),
412            }
413        }
414        FilterNode::Or(children) => {
415            let parts: Vec<String> = children.iter()
416                .map(compile_filter_inline)
417                .filter(|s| !s.is_empty())
418                .collect();
419            match parts.len() {
420                0 => String::new(),
421                1 => parts.into_iter().next().unwrap(),
422                _ => format!("({})", parts.join(" OR ")),
423            }
424        }
425    }
426}
427
428/// Walk a HAVING FilterNode and append any referenced aggregate columns that
429/// are missing from `selects`. The SQL dialect will assign aliases later.
430fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
431    let cols = collect_having_columns(having);
432    for col in cols {
433        if !col.contains('(') {
434            continue;
435        }
436        let already_present = selects.iter().any(|s| match s {
437            SelectExpr::Column { column, .. } => column == &col,
438            _ => false,
439        });
440        if !already_present {
441            selects.push(SelectExpr::Column {
442                column: col,
443                alias: None,
444            });
445        }
446    }
447}
448
449fn collect_having_columns(node: &FilterNode) -> Vec<String> {
450    match node {
451        FilterNode::Empty => vec![],
452        FilterNode::Condition { column, .. } => vec![column.clone()],
453        FilterNode::And(children) | FilterNode::Or(children) => {
454            children.iter().flat_map(collect_having_columns).collect()
455        }
456    }
457}
458
459/// Detect if a column expression is an aggregate function call.
460/// Matches patterns like `argMaxMerge(...)`, `countMerge(...)`, `sumMerge(...)`, etc.
461fn is_aggregate_column(column: &str) -> bool {
462    column.contains('(')
463}
464
465/// Walk a FilterNode tree and split it into (where_part, having_part).
466/// Leaf conditions on aggregate columns go to HAVING; everything else stays in WHERE.
467fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
468    match node {
469        FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
470        FilterNode::Condition { ref column, .. } => {
471            if is_aggregate_column(column) {
472                (FilterNode::Empty, node)
473            } else {
474                (node, FilterNode::Empty)
475            }
476        }
477        FilterNode::And(children) => {
478            let mut where_parts = Vec::new();
479            let mut having_parts = Vec::new();
480            for child in children {
481                let (w, h) = split_aggregate_filters(child);
482                if !w.is_empty() { where_parts.push(w); }
483                if !h.is_empty() { having_parts.push(h); }
484            }
485            let where_node = match where_parts.len() {
486                0 => FilterNode::Empty,
487                1 => where_parts.into_iter().next().unwrap(),
488                _ => FilterNode::And(where_parts),
489            };
490            let having_node = match having_parts.len() {
491                0 => FilterNode::Empty,
492                1 => having_parts.into_iter().next().unwrap(),
493                _ => FilterNode::And(having_parts),
494            };
495            (where_node, having_node)
496        }
497        FilterNode::Or(children) => {
498            let any_aggregate = children.iter().any(filter_has_aggregate);
499            if any_aggregate {
500                (FilterNode::Empty, FilterNode::Or(children))
501            } else {
502                (FilterNode::Or(children), FilterNode::Empty)
503            }
504        }
505    }
506}
507
508fn filter_has_aggregate(node: &FilterNode) -> bool {
509    match node {
510        FilterNode::Empty => false,
511        FilterNode::Condition { column, .. } => is_aggregate_column(column),
512        FilterNode::And(children) | FilterNode::Or(children) => {
513            children.iter().any(filter_has_aggregate)
514        }
515    }
516}
517
518fn parse_limit_by(
519    args: &ObjectAccessor,
520    cube: &CubeDefinition,
521) -> Result<Option<LimitByExpr>, async_graphql::Error> {
522    let lb_val = match args.try_get("limitBy") {
523        Ok(v) => v,
524        Err(_) => return Ok(None),
525    };
526    let lb_obj = lb_val.object()?;
527    let count = lb_obj.try_get("count")?.i64()? as u32;
528    let offset = lb_obj
529        .try_get("offset")
530        .ok()
531        .and_then(|v| v.i64().ok())
532        .unwrap_or(0) as u32;
533    let by_str = lb_obj.try_get("by")?.string()?;
534
535    let flat = cube.flat_dimensions();
536    let columns: Vec<String> = by_str
537        .split(',')
538        .map(|s| {
539            let trimmed = s.trim();
540            flat.iter()
541                .find(|(path, _)| path == trimmed)
542                .map(|(_, dim)| dim.column.clone())
543                .unwrap_or_else(|| trimmed.to_string())
544        })
545        .collect();
546
547    if columns.is_empty() {
548        return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
549    }
550
551    Ok(Some(LimitByExpr { count, offset, columns }))
552}