1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9 CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10 metric_key, dim_agg_key,
11};
12
13pub struct MetricRequest {
15 pub function: String,
16 pub alias: String,
18 pub of_dimension: String,
19 pub select_where_value: Option<async_graphql::Value>,
21 pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27 cube: &CubeDefinition,
28 network: &str,
29 args: &ObjectAccessor,
30 metrics: &[MetricRequest],
31 quantiles: &[QuantileRequest],
32 calculates: &[CalculateRequest],
33 field_aliases: &FieldAliasMap,
34 dim_aggs: &[DimAggRequest],
35 time_intervals: &[TimeIntervalRequest],
36 requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38 let flat = cube.flat_dimensions();
39 let requested_cols: Vec<String> = flat.iter()
40 .filter(|(path, _)| {
41 requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42 })
43 .map(|(_, dim)| dim.column.clone())
44 .collect();
45 let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47 let filters = if let Ok(where_val) = args.try_get("where") {
48 if let Ok(where_obj) = where_val.object() {
49 filter::parse_where(&where_obj, &cube.dimensions)?
50 } else {
51 FilterNode::Empty
52 }
53 } else {
54 FilterNode::Empty
55 };
56
57 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58 let filters = if let Some(ref chain_col) = cube.chain_column {
59 let chain_filter = FilterNode::Condition {
60 column: chain_col.clone(),
61 op: CompareOp::Eq,
62 value: SqlValue::String(network.to_string()),
63 };
64 if filters.is_empty() {
65 chain_filter
66 } else {
67 FilterNode::And(vec![chain_filter, filters])
68 }
69 } else {
70 filters
71 };
72 let filters = apply_default_filters(filters, &cube.default_filters);
73 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75 let mut selects: Vec<SelectExpr> = flat
76 .iter()
77 .filter(|(path, _)| {
78 requested_fields
79 .as_ref()
80 .is_none_or(|rf| rf.contains(path))
81 })
82 .map(|(_, dim)| SelectExpr::Column {
83 column: dim.column.clone(),
84 alias: None,
85 })
86 .collect();
87
88 let array_cols = cube.array_columns();
92 if !array_cols.is_empty() {
93 let selected_cols: HashSet<String> = selects.iter()
94 .filter_map(|s| match s {
95 SelectExpr::Column { column, .. } => Some(column.clone()),
96 _ => None,
97 })
98 .collect();
99 for (path, col) in &array_cols {
100 if selected_cols.contains(col) {
101 continue;
102 }
103 let should_include = requested_fields.as_ref().is_none_or(|rf| {
104 let parent = path.rsplit_once('_').map(|(p, _)| p).unwrap_or(path);
109 rf.iter().any(|f| f.starts_with(parent))
110 });
111 if should_include {
112 selects.push(SelectExpr::Column {
113 column: col.clone(),
114 alias: None,
115 });
116 }
117 }
118 }
119
120 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
121 selects = flat
122 .iter()
123 .map(|(_, dim)| SelectExpr::Column {
124 column: dim.column.clone(),
125 alias: None,
126 })
127 .collect();
128 }
129
130 for ti in time_intervals {
132 let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
133 let alias = dim_agg_key(&ti.graphql_alias);
134 for sel in &mut selects {
135 if let SelectExpr::Column { column, alias: ref mut a } = sel {
136 if column == &ti.column {
137 *column = interval_expr.clone();
138 *a = Some(alias.clone());
139 break;
140 }
141 }
142 }
143 }
144
145 let (filters, agg_having) = split_aggregate_filters(filters);
146 let mut group_by = Vec::new();
147 let mut having = agg_having;
148
149 if !metrics.is_empty() || !dim_aggs.is_empty() {
151 let agg_columns: HashSet<String> = dim_aggs.iter()
152 .map(|da| da.value_column.clone())
153 .collect();
154
155 group_by = selects
156 .iter()
157 .filter_map(|s| match s {
158 SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
159 Some(column.clone())
160 }
161 _ => None,
162 })
163 .collect();
164
165 for da in dim_aggs {
166 selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
167 let alias = dim_agg_key(&da.graphql_alias);
168 let condition = da.condition_filter.as_ref().and_then(|f| {
169 let sql = compile_filter_inline(f);
170 if sql.is_empty() { None } else { Some(sql) }
171 });
172 let func_name = match da.agg_type {
173 DimAggType::ArgMax => "argMax",
174 DimAggType::ArgMin => "argMin",
175 };
176
177 selects.push(SelectExpr::DimAggregate {
178 agg_type: da.agg_type.clone(),
179 value_column: da.value_column.clone(),
180 compare_column: da.compare_column.clone(),
181 alias: alias.clone(),
182 condition,
183 });
184
185 if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
186 let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
187 let h = parse_select_where_from_value(obj, &agg_expr)?;
188 if !h.is_empty() {
189 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
190 }
191 }
192 }
193
194 for m in metrics {
195 let dim_col = flat.iter()
196 .find(|(path, _)| path == &m.of_dimension)
197 .map(|(_, dim)| dim.column.clone())
198 .unwrap_or_else(|| "*".to_string());
199 let alias = metric_key(&m.alias);
200 let metric_def = cube.find_metric(&m.function);
201
202 if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
203 let tmpl = md.expression_template.as_ref().unwrap();
204 let expanded = tmpl.replace("{column}", &dim_col);
205 selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
206 } else {
207 let func = m.function.to_uppercase();
208 let condition = m.condition_filter.as_ref().and_then(|f| {
209 let sql = compile_filter_inline(f);
210 if sql.is_empty() { None } else { Some(sql) }
211 });
212 selects.push(SelectExpr::Aggregate {
213 function: func.clone(), column: dim_col.clone(),
214 alias: alias.clone(), condition,
215 });
216 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
217 let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
218 else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
219 else { format!("{func}(`{dim_col}`)") };
220 let h = parse_select_where_from_value(obj, &agg_expr)?;
221 if !h.is_empty() {
222 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
223 }
224 }
225 }
226 }
227 }
228
229 for q in quantiles {
230 let dim_col = flat.iter()
231 .find(|(path, _)| path == &q.of_dimension)
232 .map(|(_, dim)| dim.column.clone())
233 .unwrap_or_else(|| "*".to_string());
234 let alias = metric_key(&q.alias);
235 let expr = format!("quantile({})(`{}`)", q.level, dim_col);
236 selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
237 if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
238 group_by = selects.iter().filter_map(|s| match s {
239 SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
240 _ => None,
241 }).collect();
242 }
243 }
244
245 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
247
248 for calc in calculates {
249 let alias = metric_key(&calc.alias);
250 let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
251 selects.push(SelectExpr::Column {
252 column: format!("ifNotFinite(({resolved}), 0)"),
253 alias: Some(alias),
254 });
255 }
256
257 ensure_having_columns_in_selects(&having, &mut selects);
258
259 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
260 let order_by = parse_order_by(args, cube, &allowed_keys)?;
261 let limit_by = parse_limit_by(args, cube)?;
262
263 let from_subquery = cube.from_subquery.as_ref().map(|s| {
264 s.replace("{schema}", &schema).replace("{chain}", network)
265 });
266
267 Ok(QueryIR {
268 cube: cube.name.clone(),
269 schema,
270 table,
271 selects,
272 filters,
273 having,
274 group_by,
275 order_by,
276 limit,
277 offset,
278 limit_by,
279 use_final: cube.use_final,
280 joins: Vec::new(),
281 custom_query_builder: cube.custom_query_builder.clone(),
282 from_subquery,
283 })
284}
285
286fn parse_select_where_from_value(
289 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
290 aggregate_expr: &str,
291) -> Result<FilterNode, async_graphql::Error> {
292 let mut conditions = Vec::new();
293
294 for (key, op) in &[
295 ("eq", CompareOp::Eq),
296 ("ne", CompareOp::Ne),
297 ("gt", CompareOp::Gt),
298 ("ge", CompareOp::Ge),
299 ("lt", CompareOp::Lt),
300 ("le", CompareOp::Le),
301 ] {
302 if let Some(val) = obj.get(*key) {
303 let sql_val = match val {
304 async_graphql::Value::String(s) => {
305 if let Ok(f) = s.parse::<f64>() {
306 SqlValue::Float(f)
307 } else {
308 SqlValue::String(s.clone())
309 }
310 }
311 async_graphql::Value::Number(n) => {
312 if let Some(f) = n.as_f64() {
313 SqlValue::Float(f)
314 } else {
315 SqlValue::Int(n.as_i64().unwrap_or(0))
316 }
317 }
318 _ => continue,
319 };
320 conditions.push(FilterNode::Condition {
321 column: aggregate_expr.to_string(),
322 op: op.clone(),
323 value: sql_val,
324 });
325 }
326 }
327
328 Ok(match conditions.len() {
329 0 => FilterNode::Empty,
330 1 => conditions.into_iter().next().unwrap(),
331 _ => FilterNode::And(conditions),
332 })
333}
334
335fn merge_selector_filters(
336 base: FilterNode,
337 args: &ObjectAccessor,
338 selectors: &[SelectorDef],
339) -> Result<FilterNode, async_graphql::Error> {
340 let mut extra = Vec::new();
341
342 for sel in selectors {
343 if let Ok(val) = args.try_get(&sel.graphql_name) {
344 if let Ok(obj) = val.object() {
345 let leaf_filters =
346 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
347 extra.extend(leaf_filters);
348 }
349 }
350 }
351
352 if extra.is_empty() {
353 return Ok(base);
354 }
355 if base.is_empty() {
356 return Ok(if extra.len() == 1 {
357 extra.remove(0)
358 } else {
359 FilterNode::And(extra)
360 });
361 }
362 extra.push(base);
363 Ok(FilterNode::And(extra))
364}
365
366fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
367 if defaults.is_empty() {
368 return user_filters;
369 }
370
371 let mut default_nodes: Vec<FilterNode> = defaults
372 .iter()
373 .map(|(col, val)| {
374 let sql_val = if val == "true" || val == "false" {
375 SqlValue::Bool(val == "true")
376 } else if let Ok(n) = val.parse::<i64>() {
377 SqlValue::Int(n)
378 } else {
379 SqlValue::String(val.clone())
380 };
381 FilterNode::Condition {
382 column: col.clone(),
383 op: CompareOp::Eq,
384 value: sql_val,
385 }
386 })
387 .collect();
388
389 if user_filters.is_empty() {
390 if default_nodes.len() == 1 {
391 return default_nodes.remove(0);
392 }
393 return FilterNode::And(default_nodes);
394 }
395
396 default_nodes.push(user_filters);
397 FilterNode::And(default_nodes)
398}
399
400fn parse_limit(
401 args: &ObjectAccessor,
402 default: u32,
403 max: u32,
404) -> Result<(u32, u32), async_graphql::Error> {
405 let mut limit = default;
406 let mut offset = 0u32;
407
408 if let Ok(limit_val) = args.try_get("limit") {
409 if let Ok(limit_obj) = limit_val.object() {
410 if let Ok(count) = limit_obj.try_get("count") {
411 limit = (count.i64()? as u32).min(max);
412 }
413 if let Ok(off) = limit_obj.try_get("offset") {
414 offset = off.i64()? as u32;
415 }
416 }
417 }
418
419 Ok((limit, offset))
420}
421
422fn parse_order_by(
423 args: &ObjectAccessor,
424 cube: &CubeDefinition,
425 allowed_keys: &HashMap<String, String>,
426) -> Result<Vec<OrderExpr>, async_graphql::Error> {
427 let order_val = match args.try_get("orderBy") {
428 Ok(v) => v,
429 Err(_) => return Ok(Vec::new()),
430 };
431
432 let obj = order_val.object()
433 .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
434 let flat = cube.flat_dimensions();
435
436 if let Ok(field) = obj.try_get("descending") {
437 let path = field.enum_name()
438 .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
439 let column = flat.iter()
440 .find(|(p, _)| p == path)
441 .map(|(_, dim)| dim.column.clone())
442 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
443 return Ok(vec![OrderExpr { column, descending: true }]);
444 }
445
446 if let Ok(field) = obj.try_get("ascending") {
447 let path = field.enum_name()
448 .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
449 let column = flat.iter()
450 .find(|(p, _)| p == path)
451 .map(|(_, dim)| dim.column.clone())
452 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
453 return Ok(vec![OrderExpr { column, descending: false }]);
454 }
455
456 if let Ok(field_str) = obj.try_get("descendingByField") {
457 let name = field_str.string()
458 .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
459 let column = resolve_field_in_keys(name, allowed_keys)?;
460 return Ok(vec![OrderExpr { column, descending: true }]);
461 }
462
463 if let Ok(field_str) = obj.try_get("ascendingByField") {
464 let name = field_str.string()
465 .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
466 let column = resolve_field_in_keys(name, allowed_keys)?;
467 return Ok(vec![OrderExpr { column, descending: false }]);
468 }
469
470 Ok(vec![])
471}
472
473fn resolve_field_in_keys(
476 name: &str,
477 allowed_keys: &HashMap<String, String>,
478) -> Result<String, async_graphql::Error> {
479 if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
480 Err(async_graphql::Error::new(format!(
481 "Can't use '{name}' in sorting/ordering. Field not found in executed query."
482 )))
483}
484
485fn collect_select_keys(
492 selects: &[SelectExpr],
493 flat: &[(String, crate::cube::definition::Dimension)],
494 field_aliases: &FieldAliasMap,
495 dim_aggs: &[DimAggRequest],
496 time_intervals: &[TimeIntervalRequest],
497) -> HashMap<String, String> {
498 let mut keys = HashMap::new();
499 for sel in selects {
500 match sel {
501 SelectExpr::Column { column, alias: Some(a) } => {
502 keys.insert(a.clone(), column.clone());
503 if let Some(name) = a.strip_prefix("__da_") {
504 keys.insert(name.to_string(), column.clone());
505 } else if let Some(name) = a.strip_prefix("__") {
506 keys.insert(name.to_string(), column.clone());
507 }
508 }
509 SelectExpr::Column { column, alias: None } => {
510 if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
511 keys.insert(path.clone(), column.clone());
512 }
513 keys.insert(column.clone(), column.clone());
514 }
515 SelectExpr::Aggregate { alias, function, column, .. } => {
516 let expr = format_agg_sql(function, column);
517 keys.insert(alias.clone(), expr.clone());
518 if let Some(name) = alias.strip_prefix("__") {
519 keys.insert(name.to_string(), expr);
520 }
521 }
522 SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
523 let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
524 keys.insert(alias.clone(), expr.clone());
525 if let Some(name) = alias.strip_prefix("__da_") {
526 keys.insert(name.to_string(), expr);
527 }
528 }
529 }
530 }
531 for da in dim_aggs {
533 let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
534 let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
535 keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
537 keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
539 if let Some(i) = da.field_path.rfind('_') {
540 let parent = &da.field_path[..i];
541 keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
543 keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
545 }
546 }
547 for ti in time_intervals {
549 let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
550 if let Some(i) = ti.field_path.rfind('_') {
551 let parent = &ti.field_path[..i];
552 keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
553 }
554 }
555 for (alias_path, column) in field_aliases {
556 keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
557 }
558 keys
559}
560
561fn format_agg_sql(function: &str, column: &str) -> String {
562 let func = function.to_uppercase();
563 let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
564 match (func.as_str(), column) {
565 ("COUNT", "*") => "count()".to_string(),
566 ("UNIQ", _) => format!("uniq({qcol})"),
567 (f, _) => format!("{}({qcol})", f.to_lowercase()),
568 }
569}
570
571fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
572 let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
573 let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
574 let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
575 format!("{func}({qval}, {qcmp})")
576}
577
578fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
580 let mut result = String::new();
581 let mut chars = expression.chars().peekable();
582 while let Some(ch) = chars.next() {
583 if ch == '$' {
584 let mut var_name = String::new();
585 while let Some(&c) = chars.peek() {
586 if c.is_alphanumeric() || c == '_' {
587 var_name.push(c);
588 chars.next();
589 } else {
590 break;
591 }
592 }
593 if !var_name.is_empty() {
594 if let Some(resolved) = allowed_keys.get(&var_name) {
595 let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
596 result.push_str(&format!("toFloat64({col_ref})"));
597 } else {
598 result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
599 }
600 } else {
601 result.push('$');
602 }
603 } else {
604 result.push(ch);
605 }
606 }
607 result
608}
609
610fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
611 let unit_sql = match unit {
612 "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
613 "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
614 };
615 format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
616}
617
618fn compile_filter_inline(node: &FilterNode) -> String {
621 match node {
622 FilterNode::Empty => String::new(),
623 FilterNode::Condition { column, op, value } => {
624 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
625 if op.is_unary() {
626 return format!("{col} {}", op.sql_op());
627 }
628 let val_str = match value {
629 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
630 SqlValue::Int(i) => i.to_string(),
631 SqlValue::Float(f) => f.to_string(),
632 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
633 SqlValue::Expression(e) => e.clone(),
634 };
635 match op {
636 CompareOp::In | CompareOp::NotIn => {
637 if let SqlValue::String(csv) = value {
638 let items: Vec<String> = csv.split(',')
639 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
640 .collect();
641 format!("{col} {} ({})", op.sql_op(), items.join(", "))
642 } else {
643 format!("{col} {} ({val_str})", op.sql_op())
644 }
645 }
646 CompareOp::Includes => {
647 let like_val = match value {
648 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
649 _ => val_str,
650 };
651 format!("{col} LIKE {like_val}")
652 }
653 _ => format!("{col} {} {val_str}", op.sql_op()),
654 }
655 }
656 FilterNode::And(children) => {
657 let parts: Vec<String> = children.iter()
658 .map(compile_filter_inline)
659 .filter(|s| !s.is_empty())
660 .collect();
661 match parts.len() {
662 0 => String::new(),
663 1 => parts.into_iter().next().unwrap(),
664 _ => format!("({})", parts.join(" AND ")),
665 }
666 }
667 FilterNode::Or(children) => {
668 let parts: Vec<String> = children.iter()
669 .map(compile_filter_inline)
670 .filter(|s| !s.is_empty())
671 .collect();
672 match parts.len() {
673 0 => String::new(),
674 1 => parts.into_iter().next().unwrap(),
675 _ => format!("({})", parts.join(" OR ")),
676 }
677 }
678 FilterNode::ArrayIncludes { .. } => {
679 String::new()
682 }
683 }
684}
685
686fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
689 let cols = collect_having_columns(having);
690 for col in cols {
691 if !col.contains('(') {
692 continue;
693 }
694 let already_present = selects.iter().any(|s| match s {
695 SelectExpr::Column { column, .. } => column == &col,
696 _ => false,
697 });
698 if !already_present {
699 selects.push(SelectExpr::Column {
700 column: col,
701 alias: None,
702 });
703 }
704 }
705}
706
707fn collect_having_columns(node: &FilterNode) -> Vec<String> {
708 match node {
709 FilterNode::Empty => vec![],
710 FilterNode::Condition { column, .. } => vec![column.clone()],
711 FilterNode::And(children) | FilterNode::Or(children) => {
712 children.iter().flat_map(collect_having_columns).collect()
713 }
714 FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
715 }
716}
717
718fn is_aggregate_column(column: &str) -> bool {
721 column.contains('(')
722}
723
724fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
727 match node {
728 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
729 FilterNode::Condition { ref column, .. } => {
730 if is_aggregate_column(column) {
731 (FilterNode::Empty, node)
732 } else {
733 (node, FilterNode::Empty)
734 }
735 }
736 FilterNode::And(children) => {
737 let mut where_parts = Vec::new();
738 let mut having_parts = Vec::new();
739 for child in children {
740 let (w, h) = split_aggregate_filters(child);
741 if !w.is_empty() { where_parts.push(w); }
742 if !h.is_empty() { having_parts.push(h); }
743 }
744 let where_node = match where_parts.len() {
745 0 => FilterNode::Empty,
746 1 => where_parts.into_iter().next().unwrap(),
747 _ => FilterNode::And(where_parts),
748 };
749 let having_node = match having_parts.len() {
750 0 => FilterNode::Empty,
751 1 => having_parts.into_iter().next().unwrap(),
752 _ => FilterNode::And(having_parts),
753 };
754 (where_node, having_node)
755 }
756 FilterNode::Or(children) => {
757 let any_aggregate = children.iter().any(filter_has_aggregate);
758 if any_aggregate {
759 (FilterNode::Empty, FilterNode::Or(children))
760 } else {
761 (FilterNode::Or(children), FilterNode::Empty)
762 }
763 }
764 FilterNode::ArrayIncludes { .. } => {
765 (node, FilterNode::Empty)
767 }
768 }
769}
770
771fn filter_has_aggregate(node: &FilterNode) -> bool {
772 match node {
773 FilterNode::Empty => false,
774 FilterNode::Condition { column, .. } => is_aggregate_column(column),
775 FilterNode::And(children) | FilterNode::Or(children) => {
776 children.iter().any(filter_has_aggregate)
777 }
778 FilterNode::ArrayIncludes { .. } => false,
779 }
780}
781
782fn parse_limit_by(
783 args: &ObjectAccessor,
784 cube: &CubeDefinition,
785) -> Result<Option<LimitByExpr>, async_graphql::Error> {
786 let lb_val = match args.try_get("limitBy") {
787 Ok(v) => v,
788 Err(_) => return Ok(None),
789 };
790 let lb_obj = lb_val.object()?;
791 let count = lb_obj.try_get("count")?.i64()? as u32;
792 let offset = lb_obj
793 .try_get("offset")
794 .ok()
795 .and_then(|v| v.i64().ok())
796 .unwrap_or(0) as u32;
797 let by_val = lb_obj.try_get("by")?;
798 let by_str = by_val.enum_name()?;
799
800 let flat = cube.flat_dimensions();
801 let column = flat.iter()
802 .find(|(path, _)| path == by_str)
803 .map(|(_, dim)| dim.column.clone())
804 .ok_or_else(|| async_graphql::Error::new(
805 format!("Unknown limitBy field: {by_str}")
806 ))?;
807
808 Ok(Some(LimitByExpr { count, offset, columns: vec![column] }))
809}