1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9 CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10 metric_key, dim_agg_key,
11};
12
13pub struct MetricRequest {
15 pub function: String,
16 pub alias: String,
18 pub of_dimension: String,
19 pub select_where_value: Option<async_graphql::Value>,
21 pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27 cube: &CubeDefinition,
28 network: &str,
29 args: &ObjectAccessor,
30 metrics: &[MetricRequest],
31 quantiles: &[QuantileRequest],
32 calculates: &[CalculateRequest],
33 field_aliases: &FieldAliasMap,
34 dim_aggs: &[DimAggRequest],
35 time_intervals: &[TimeIntervalRequest],
36 requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38 let flat = cube.flat_dimensions();
39 let requested_cols: Vec<String> = flat.iter()
40 .filter(|(path, _)| {
41 requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42 })
43 .map(|(_, dim)| dim.column.clone())
44 .collect();
45 let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47 let filters = if let Ok(where_val) = args.try_get("where") {
48 if let Ok(where_obj) = where_val.object() {
49 filter::parse_where(&where_obj, &cube.dimensions)?
50 } else {
51 FilterNode::Empty
52 }
53 } else {
54 FilterNode::Empty
55 };
56
57 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58 let filters = if let Some(ref chain_col) = cube.chain_column {
59 let chain_filter = FilterNode::Condition {
60 column: chain_col.clone(),
61 op: CompareOp::Eq,
62 value: SqlValue::String(network.to_string()),
63 };
64 if filters.is_empty() {
65 chain_filter
66 } else {
67 FilterNode::And(vec![chain_filter, filters])
68 }
69 } else {
70 filters
71 };
72 let filters = apply_default_filters(filters, &cube.default_filters);
73 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75 let mut selects: Vec<SelectExpr> = flat
76 .iter()
77 .filter(|(path, _)| {
78 requested_fields
79 .as_ref()
80 .is_none_or(|rf| rf.contains(path))
81 })
82 .map(|(_, dim)| SelectExpr::Column {
83 column: dim.column.clone(),
84 alias: None,
85 })
86 .collect();
87
88 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
89 selects = flat
90 .iter()
91 .map(|(_, dim)| SelectExpr::Column {
92 column: dim.column.clone(),
93 alias: None,
94 })
95 .collect();
96 }
97
98 for ti in time_intervals {
100 let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
101 let alias = dim_agg_key(&ti.graphql_alias);
102 for sel in &mut selects {
103 if let SelectExpr::Column { column, alias: ref mut a } = sel {
104 if column == &ti.column {
105 *column = interval_expr.clone();
106 *a = Some(alias.clone());
107 break;
108 }
109 }
110 }
111 }
112
113 let (filters, agg_having) = split_aggregate_filters(filters);
114 let mut group_by = Vec::new();
115 let mut having = agg_having;
116
117 if !metrics.is_empty() || !dim_aggs.is_empty() {
119 let agg_columns: HashSet<String> = dim_aggs.iter()
120 .map(|da| da.value_column.clone())
121 .collect();
122
123 group_by = selects
124 .iter()
125 .filter_map(|s| match s {
126 SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
127 Some(column.clone())
128 }
129 _ => None,
130 })
131 .collect();
132
133 for da in dim_aggs {
134 selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
135 let alias = dim_agg_key(&da.graphql_alias);
136 let condition = da.condition_filter.as_ref().and_then(|f| {
137 let sql = compile_filter_inline(f);
138 if sql.is_empty() { None } else { Some(sql) }
139 });
140 let func_name = match da.agg_type {
141 DimAggType::ArgMax => "argMax",
142 DimAggType::ArgMin => "argMin",
143 };
144
145 selects.push(SelectExpr::DimAggregate {
146 agg_type: da.agg_type.clone(),
147 value_column: da.value_column.clone(),
148 compare_column: da.compare_column.clone(),
149 alias: alias.clone(),
150 condition,
151 });
152
153 if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
154 let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
155 let h = parse_select_where_from_value(obj, &agg_expr)?;
156 if !h.is_empty() {
157 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
158 }
159 }
160 }
161
162 for m in metrics {
163 let dim_col = flat.iter()
164 .find(|(path, _)| path == &m.of_dimension)
165 .map(|(_, dim)| dim.column.clone())
166 .unwrap_or_else(|| "*".to_string());
167 let alias = metric_key(&m.alias);
168 let metric_def = cube.find_metric(&m.function);
169
170 if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
171 let tmpl = md.expression_template.as_ref().unwrap();
172 let expanded = tmpl.replace("{column}", &dim_col);
173 selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
174 } else {
175 let func = m.function.to_uppercase();
176 let condition = m.condition_filter.as_ref().and_then(|f| {
177 let sql = compile_filter_inline(f);
178 if sql.is_empty() { None } else { Some(sql) }
179 });
180 selects.push(SelectExpr::Aggregate {
181 function: func.clone(), column: dim_col.clone(),
182 alias: alias.clone(), condition,
183 });
184 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
185 let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
186 else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
187 else { format!("{func}(`{dim_col}`)") };
188 let h = parse_select_where_from_value(obj, &agg_expr)?;
189 if !h.is_empty() {
190 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
191 }
192 }
193 }
194 }
195 }
196
197 for q in quantiles {
198 let dim_col = flat.iter()
199 .find(|(path, _)| path == &q.of_dimension)
200 .map(|(_, dim)| dim.column.clone())
201 .unwrap_or_else(|| "*".to_string());
202 let alias = metric_key(&q.alias);
203 let expr = format!("quantile({})(`{}`)", q.level, dim_col);
204 selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
205 if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
206 group_by = selects.iter().filter_map(|s| match s {
207 SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
208 _ => None,
209 }).collect();
210 }
211 }
212
213 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
215
216 for calc in calculates {
217 let alias = metric_key(&calc.alias);
218 let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
219 selects.push(SelectExpr::Column {
220 column: format!("ifNotFinite(({resolved}), 0)"),
221 alias: Some(alias),
222 });
223 }
224
225 ensure_having_columns_in_selects(&having, &mut selects);
226
227 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
228 let order_by = parse_order_by(args, cube, &allowed_keys)?;
229 let limit_by = parse_limit_by(args, cube)?;
230
231 let from_subquery = cube.from_subquery.as_ref().map(|s| {
232 s.replace("{schema}", &schema).replace("{chain}", network)
233 });
234
235 Ok(QueryIR {
236 cube: cube.name.clone(),
237 schema,
238 table,
239 selects,
240 filters,
241 having,
242 group_by,
243 order_by,
244 limit,
245 offset,
246 limit_by,
247 use_final: cube.use_final,
248 joins: Vec::new(),
249 custom_query_builder: cube.custom_query_builder.clone(),
250 from_subquery,
251 })
252}
253
254fn parse_select_where_from_value(
257 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
258 aggregate_expr: &str,
259) -> Result<FilterNode, async_graphql::Error> {
260 let mut conditions = Vec::new();
261
262 for (key, op) in &[
263 ("eq", CompareOp::Eq),
264 ("ne", CompareOp::Ne),
265 ("gt", CompareOp::Gt),
266 ("ge", CompareOp::Ge),
267 ("lt", CompareOp::Lt),
268 ("le", CompareOp::Le),
269 ] {
270 if let Some(val) = obj.get(*key) {
271 let sql_val = match val {
272 async_graphql::Value::String(s) => {
273 if let Ok(f) = s.parse::<f64>() {
274 SqlValue::Float(f)
275 } else {
276 SqlValue::String(s.clone())
277 }
278 }
279 async_graphql::Value::Number(n) => {
280 if let Some(f) = n.as_f64() {
281 SqlValue::Float(f)
282 } else {
283 SqlValue::Int(n.as_i64().unwrap_or(0))
284 }
285 }
286 _ => continue,
287 };
288 conditions.push(FilterNode::Condition {
289 column: aggregate_expr.to_string(),
290 op: op.clone(),
291 value: sql_val,
292 });
293 }
294 }
295
296 Ok(match conditions.len() {
297 0 => FilterNode::Empty,
298 1 => conditions.into_iter().next().unwrap(),
299 _ => FilterNode::And(conditions),
300 })
301}
302
303fn merge_selector_filters(
304 base: FilterNode,
305 args: &ObjectAccessor,
306 selectors: &[SelectorDef],
307) -> Result<FilterNode, async_graphql::Error> {
308 let mut extra = Vec::new();
309
310 for sel in selectors {
311 if let Ok(val) = args.try_get(&sel.graphql_name) {
312 if let Ok(obj) = val.object() {
313 let leaf_filters =
314 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
315 extra.extend(leaf_filters);
316 }
317 }
318 }
319
320 if extra.is_empty() {
321 return Ok(base);
322 }
323 if base.is_empty() {
324 return Ok(if extra.len() == 1 {
325 extra.remove(0)
326 } else {
327 FilterNode::And(extra)
328 });
329 }
330 extra.push(base);
331 Ok(FilterNode::And(extra))
332}
333
334fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
335 if defaults.is_empty() {
336 return user_filters;
337 }
338
339 let mut default_nodes: Vec<FilterNode> = defaults
340 .iter()
341 .map(|(col, val)| {
342 let sql_val = if val == "true" || val == "false" {
343 SqlValue::Bool(val == "true")
344 } else if let Ok(n) = val.parse::<i64>() {
345 SqlValue::Int(n)
346 } else {
347 SqlValue::String(val.clone())
348 };
349 FilterNode::Condition {
350 column: col.clone(),
351 op: CompareOp::Eq,
352 value: sql_val,
353 }
354 })
355 .collect();
356
357 if user_filters.is_empty() {
358 if default_nodes.len() == 1 {
359 return default_nodes.remove(0);
360 }
361 return FilterNode::And(default_nodes);
362 }
363
364 default_nodes.push(user_filters);
365 FilterNode::And(default_nodes)
366}
367
368fn parse_limit(
369 args: &ObjectAccessor,
370 default: u32,
371 max: u32,
372) -> Result<(u32, u32), async_graphql::Error> {
373 let mut limit = default;
374 let mut offset = 0u32;
375
376 if let Ok(limit_val) = args.try_get("limit") {
377 if let Ok(limit_obj) = limit_val.object() {
378 if let Ok(count) = limit_obj.try_get("count") {
379 limit = (count.i64()? as u32).min(max);
380 }
381 if let Ok(off) = limit_obj.try_get("offset") {
382 offset = off.i64()? as u32;
383 }
384 }
385 }
386
387 Ok((limit, offset))
388}
389
390fn parse_order_by(
391 args: &ObjectAccessor,
392 cube: &CubeDefinition,
393 allowed_keys: &HashMap<String, String>,
394) -> Result<Vec<OrderExpr>, async_graphql::Error> {
395 let order_val = match args.try_get("orderBy") {
396 Ok(v) => v,
397 Err(_) => return Ok(Vec::new()),
398 };
399
400 let obj = order_val.object()
401 .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
402 let flat = cube.flat_dimensions();
403
404 if let Ok(field) = obj.try_get("descending") {
405 let path = field.enum_name()
406 .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
407 let column = flat.iter()
408 .find(|(p, _)| p == path)
409 .map(|(_, dim)| dim.column.clone())
410 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
411 return Ok(vec![OrderExpr { column, descending: true }]);
412 }
413
414 if let Ok(field) = obj.try_get("ascending") {
415 let path = field.enum_name()
416 .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
417 let column = flat.iter()
418 .find(|(p, _)| p == path)
419 .map(|(_, dim)| dim.column.clone())
420 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
421 return Ok(vec![OrderExpr { column, descending: false }]);
422 }
423
424 if let Ok(field_str) = obj.try_get("descendingByField") {
425 let name = field_str.string()
426 .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
427 let column = resolve_field_in_keys(name, allowed_keys)?;
428 return Ok(vec![OrderExpr { column, descending: true }]);
429 }
430
431 if let Ok(field_str) = obj.try_get("ascendingByField") {
432 let name = field_str.string()
433 .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
434 let column = resolve_field_in_keys(name, allowed_keys)?;
435 return Ok(vec![OrderExpr { column, descending: false }]);
436 }
437
438 Ok(vec![])
439}
440
441fn resolve_field_in_keys(
444 name: &str,
445 allowed_keys: &HashMap<String, String>,
446) -> Result<String, async_graphql::Error> {
447 if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
448 Err(async_graphql::Error::new(format!(
449 "Can't use '{name}' in sorting/ordering. Field not found in executed query."
450 )))
451}
452
453fn collect_select_keys(
460 selects: &[SelectExpr],
461 flat: &[(String, crate::cube::definition::Dimension)],
462 field_aliases: &FieldAliasMap,
463 dim_aggs: &[DimAggRequest],
464 time_intervals: &[TimeIntervalRequest],
465) -> HashMap<String, String> {
466 let mut keys = HashMap::new();
467 for sel in selects {
468 match sel {
469 SelectExpr::Column { column, alias: Some(a) } => {
470 keys.insert(a.clone(), column.clone());
471 if let Some(name) = a.strip_prefix("__da_") {
472 keys.insert(name.to_string(), column.clone());
473 } else if let Some(name) = a.strip_prefix("__") {
474 keys.insert(name.to_string(), column.clone());
475 }
476 }
477 SelectExpr::Column { column, alias: None } => {
478 if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
479 keys.insert(path.clone(), column.clone());
480 }
481 keys.insert(column.clone(), column.clone());
482 }
483 SelectExpr::Aggregate { alias, function, column, .. } => {
484 let expr = format_agg_sql(function, column);
485 keys.insert(alias.clone(), expr.clone());
486 if let Some(name) = alias.strip_prefix("__") {
487 keys.insert(name.to_string(), expr);
488 }
489 }
490 SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
491 let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
492 keys.insert(alias.clone(), expr.clone());
493 if let Some(name) = alias.strip_prefix("__da_") {
494 keys.insert(name.to_string(), expr);
495 }
496 }
497 }
498 }
499 for da in dim_aggs {
501 let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
502 let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
503 keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
505 keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
507 if let Some(i) = da.field_path.rfind('_') {
508 let parent = &da.field_path[..i];
509 keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
511 keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
513 }
514 }
515 for ti in time_intervals {
517 let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
518 if let Some(i) = ti.field_path.rfind('_') {
519 let parent = &ti.field_path[..i];
520 keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
521 }
522 }
523 for (alias_path, column) in field_aliases {
524 keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
525 }
526 keys
527}
528
529fn format_agg_sql(function: &str, column: &str) -> String {
530 let func = function.to_uppercase();
531 let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
532 match (func.as_str(), column) {
533 ("COUNT", "*") => "count()".to_string(),
534 ("UNIQ", _) => format!("uniq({qcol})"),
535 (f, _) => format!("{}({qcol})", f.to_lowercase()),
536 }
537}
538
539fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
540 let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
541 let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
542 let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
543 format!("{func}({qval}, {qcmp})")
544}
545
546fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
548 let mut result = String::new();
549 let mut chars = expression.chars().peekable();
550 while let Some(ch) = chars.next() {
551 if ch == '$' {
552 let mut var_name = String::new();
553 while let Some(&c) = chars.peek() {
554 if c.is_alphanumeric() || c == '_' {
555 var_name.push(c);
556 chars.next();
557 } else {
558 break;
559 }
560 }
561 if !var_name.is_empty() {
562 if let Some(resolved) = allowed_keys.get(&var_name) {
563 let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
564 result.push_str(&format!("toFloat64({col_ref})"));
565 } else {
566 result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
567 }
568 } else {
569 result.push('$');
570 }
571 } else {
572 result.push(ch);
573 }
574 }
575 result
576}
577
578fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
579 let unit_sql = match unit {
580 "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
581 "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
582 };
583 format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
584}
585
586fn compile_filter_inline(node: &FilterNode) -> String {
589 match node {
590 FilterNode::Empty => String::new(),
591 FilterNode::Condition { column, op, value } => {
592 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
593 if op.is_unary() {
594 return format!("{col} {}", op.sql_op());
595 }
596 let val_str = match value {
597 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
598 SqlValue::Int(i) => i.to_string(),
599 SqlValue::Float(f) => f.to_string(),
600 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
601 SqlValue::Expression(e) => e.clone(),
602 };
603 match op {
604 CompareOp::In | CompareOp::NotIn => {
605 if let SqlValue::String(csv) = value {
606 let items: Vec<String> = csv.split(',')
607 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
608 .collect();
609 format!("{col} {} ({})", op.sql_op(), items.join(", "))
610 } else {
611 format!("{col} {} ({val_str})", op.sql_op())
612 }
613 }
614 CompareOp::Includes => {
615 let like_val = match value {
616 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
617 _ => val_str,
618 };
619 format!("{col} LIKE {like_val}")
620 }
621 _ => format!("{col} {} {val_str}", op.sql_op()),
622 }
623 }
624 FilterNode::And(children) => {
625 let parts: Vec<String> = children.iter()
626 .map(compile_filter_inline)
627 .filter(|s| !s.is_empty())
628 .collect();
629 match parts.len() {
630 0 => String::new(),
631 1 => parts.into_iter().next().unwrap(),
632 _ => format!("({})", parts.join(" AND ")),
633 }
634 }
635 FilterNode::Or(children) => {
636 let parts: Vec<String> = children.iter()
637 .map(compile_filter_inline)
638 .filter(|s| !s.is_empty())
639 .collect();
640 match parts.len() {
641 0 => String::new(),
642 1 => parts.into_iter().next().unwrap(),
643 _ => format!("({})", parts.join(" OR ")),
644 }
645 }
646 FilterNode::ArrayIncludes { .. } => {
647 String::new()
650 }
651 }
652}
653
654fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
657 let cols = collect_having_columns(having);
658 for col in cols {
659 if !col.contains('(') {
660 continue;
661 }
662 let already_present = selects.iter().any(|s| match s {
663 SelectExpr::Column { column, .. } => column == &col,
664 _ => false,
665 });
666 if !already_present {
667 selects.push(SelectExpr::Column {
668 column: col,
669 alias: None,
670 });
671 }
672 }
673}
674
675fn collect_having_columns(node: &FilterNode) -> Vec<String> {
676 match node {
677 FilterNode::Empty => vec![],
678 FilterNode::Condition { column, .. } => vec![column.clone()],
679 FilterNode::And(children) | FilterNode::Or(children) => {
680 children.iter().flat_map(collect_having_columns).collect()
681 }
682 FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
683 }
684}
685
686fn is_aggregate_column(column: &str) -> bool {
689 column.contains('(')
690}
691
692fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
695 match node {
696 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
697 FilterNode::Condition { ref column, .. } => {
698 if is_aggregate_column(column) {
699 (FilterNode::Empty, node)
700 } else {
701 (node, FilterNode::Empty)
702 }
703 }
704 FilterNode::And(children) => {
705 let mut where_parts = Vec::new();
706 let mut having_parts = Vec::new();
707 for child in children {
708 let (w, h) = split_aggregate_filters(child);
709 if !w.is_empty() { where_parts.push(w); }
710 if !h.is_empty() { having_parts.push(h); }
711 }
712 let where_node = match where_parts.len() {
713 0 => FilterNode::Empty,
714 1 => where_parts.into_iter().next().unwrap(),
715 _ => FilterNode::And(where_parts),
716 };
717 let having_node = match having_parts.len() {
718 0 => FilterNode::Empty,
719 1 => having_parts.into_iter().next().unwrap(),
720 _ => FilterNode::And(having_parts),
721 };
722 (where_node, having_node)
723 }
724 FilterNode::Or(children) => {
725 let any_aggregate = children.iter().any(filter_has_aggregate);
726 if any_aggregate {
727 (FilterNode::Empty, FilterNode::Or(children))
728 } else {
729 (FilterNode::Or(children), FilterNode::Empty)
730 }
731 }
732 FilterNode::ArrayIncludes { .. } => {
733 (node, FilterNode::Empty)
735 }
736 }
737}
738
739fn filter_has_aggregate(node: &FilterNode) -> bool {
740 match node {
741 FilterNode::Empty => false,
742 FilterNode::Condition { column, .. } => is_aggregate_column(column),
743 FilterNode::And(children) | FilterNode::Or(children) => {
744 children.iter().any(filter_has_aggregate)
745 }
746 FilterNode::ArrayIncludes { .. } => false,
747 }
748}
749
750fn parse_limit_by(
751 args: &ObjectAccessor,
752 cube: &CubeDefinition,
753) -> Result<Option<LimitByExpr>, async_graphql::Error> {
754 let lb_val = match args.try_get("limitBy") {
755 Ok(v) => v,
756 Err(_) => return Ok(None),
757 };
758 let lb_obj = lb_val.object()?;
759 let count = lb_obj.try_get("count")?.i64()? as u32;
760 let offset = lb_obj
761 .try_get("offset")
762 .ok()
763 .and_then(|v| v.i64().ok())
764 .unwrap_or(0) as u32;
765 let by_str = lb_obj.try_get("by")?.string()?;
766
767 let flat = cube.flat_dimensions();
768 let columns: Vec<String> = by_str
769 .split(',')
770 .map(|s| {
771 let trimmed = s.trim();
772 flat.iter()
773 .find(|(path, _)| path == trimmed)
774 .map(|(_, dim)| dim.column.clone())
775 .unwrap_or_else(|| trimmed.to_string())
776 })
777 .collect();
778
779 if columns.is_empty() {
780 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
781 }
782
783 Ok(Some(LimitByExpr { count, offset, columns }))
784}