1use std::collections::{HashMap, HashSet};
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8use crate::schema::generator::{
9 CalculateRequest, DimAggRequest, FieldAliasMap, QuantileRequest, TimeIntervalRequest,
10 metric_key, dim_agg_key,
11};
12
13pub struct MetricRequest {
15 pub function: String,
16 pub alias: String,
18 pub of_dimension: String,
19 pub select_where_value: Option<async_graphql::Value>,
21 pub condition_filter: Option<FilterNode>,
23}
24
25#[allow(clippy::too_many_arguments)]
26pub fn parse_cube_query(
27 cube: &CubeDefinition,
28 network: &str,
29 args: &ObjectAccessor,
30 metrics: &[MetricRequest],
31 quantiles: &[QuantileRequest],
32 calculates: &[CalculateRequest],
33 field_aliases: &FieldAliasMap,
34 dim_aggs: &[DimAggRequest],
35 time_intervals: &[TimeIntervalRequest],
36 requested_fields: Option<HashSet<String>>,
37) -> Result<QueryIR, async_graphql::Error> {
38 let flat = cube.flat_dimensions();
39 let requested_cols: Vec<String> = flat.iter()
40 .filter(|(path, _)| {
41 requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
42 })
43 .map(|(_, dim)| dim.column.clone())
44 .collect();
45 let (schema, table) = cube.resolve_table(network, &requested_cols);
46
47 let filters = if let Ok(where_val) = args.try_get("where") {
48 if let Ok(where_obj) = where_val.object() {
49 filter::parse_where(&where_obj, &cube.dimensions)?
50 } else {
51 FilterNode::Empty
52 }
53 } else {
54 FilterNode::Empty
55 };
56
57 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
58 let filters = if let Some(ref chain_col) = cube.chain_column {
59 let chain_filter = FilterNode::Condition {
60 column: chain_col.clone(),
61 op: CompareOp::Eq,
62 value: SqlValue::String(network.to_string()),
63 };
64 if filters.is_empty() {
65 chain_filter
66 } else {
67 FilterNode::And(vec![chain_filter, filters])
68 }
69 } else {
70 filters
71 };
72 let filters = apply_default_filters(filters, &cube.default_filters);
73 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
74
75 let mut selects: Vec<SelectExpr> = flat
76 .iter()
77 .filter(|(path, _)| {
78 requested_fields
79 .as_ref()
80 .is_none_or(|rf| rf.contains(path))
81 })
82 .map(|(_, dim)| SelectExpr::Column {
83 column: dim.column.clone(),
84 alias: None,
85 })
86 .collect();
87
88 let array_cols = cube.array_columns();
92 if !array_cols.is_empty() {
93 let selected_cols: HashSet<String> = selects.iter()
94 .filter_map(|s| match s {
95 SelectExpr::Column { column, .. } => Some(column.clone()),
96 _ => None,
97 })
98 .collect();
99 for (path, col) in &array_cols {
100 if selected_cols.contains(col) {
101 continue;
102 }
103 let should_include = requested_fields.as_ref().is_none_or(|rf| {
104 let parent = path.rsplit_once('_').map(|(p, _)| p).unwrap_or(path);
109 rf.iter().any(|f| f.starts_with(parent))
110 });
111 if should_include {
112 selects.push(SelectExpr::Column {
113 column: col.clone(),
114 alias: None,
115 });
116 }
117 }
118 }
119
120 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() && dim_aggs.is_empty() {
121 selects = flat
122 .iter()
123 .map(|(_, dim)| SelectExpr::Column {
124 column: dim.column.clone(),
125 alias: None,
126 })
127 .collect();
128 }
129
130 for ti in time_intervals {
132 let interval_expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
133 let alias = dim_agg_key(&ti.graphql_alias);
134 for sel in &mut selects {
135 if let SelectExpr::Column { column, alias: ref mut a } = sel {
136 if column == &ti.column {
137 *column = interval_expr.clone();
138 *a = Some(alias.clone());
139 break;
140 }
141 }
142 }
143 }
144
145 let (filters, agg_having) = split_aggregate_filters(filters);
146 let mut group_by = Vec::new();
147 let mut having = agg_having;
148
149 if !metrics.is_empty() || !dim_aggs.is_empty() {
151 let agg_columns: HashSet<String> = dim_aggs.iter()
152 .map(|da| da.value_column.clone())
153 .collect();
154
155 group_by = selects
156 .iter()
157 .filter_map(|s| match s {
158 SelectExpr::Column { column, .. } if !agg_columns.contains(column) => {
159 Some(column.clone())
160 }
161 _ => None,
162 })
163 .collect();
164
165 for da in dim_aggs {
166 selects.retain(|s| !matches!(s, SelectExpr::Column { column, .. } if column == &da.value_column));
167 let alias = dim_agg_key(&da.graphql_alias);
168 let condition = da.condition_filter.as_ref().and_then(|f| {
169 let sql = compile_filter_inline(f);
170 if sql.is_empty() { None } else { Some(sql) }
171 });
172 let func_name = match da.agg_type {
173 DimAggType::ArgMax => "argMax",
174 DimAggType::ArgMin => "argMin",
175 };
176
177 selects.push(SelectExpr::DimAggregate {
178 agg_type: da.agg_type.clone(),
179 value_column: da.value_column.clone(),
180 compare_column: da.compare_column.clone(),
181 alias: alias.clone(),
182 condition,
183 });
184
185 if let Some(async_graphql::Value::Object(ref obj)) = da.select_where_value {
186 let agg_expr = format!("{func_name}(`{}`, `{}`)", da.value_column, da.compare_column);
187 let h = parse_select_where_from_value(obj, &agg_expr)?;
188 if !h.is_empty() {
189 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
190 }
191 }
192 }
193
194 for m in metrics {
195 let dim_col = flat.iter()
196 .find(|(path, _)| path == &m.of_dimension)
197 .map(|(_, dim)| dim.column.clone())
198 .unwrap_or_else(|| "*".to_string());
199 let alias = metric_key(&m.alias);
200 let metric_def = cube.find_metric(&m.function);
201
202 if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
203 let tmpl = md.expression_template.as_ref().unwrap();
204 let expanded = tmpl.replace("{column}", &dim_col);
205 selects.push(SelectExpr::Column { column: expanded, alias: Some(alias) });
206 } else {
207 let func = m.function.to_uppercase();
208 let condition = m.condition_filter.as_ref().and_then(|f| {
209 let sql = compile_filter_inline(f);
210 if sql.is_empty() { None } else { Some(sql) }
211 });
212 selects.push(SelectExpr::Aggregate {
213 function: func.clone(), column: dim_col.clone(),
214 alias: alias.clone(), condition,
215 });
216 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
217 let agg_expr = if func == "COUNT" && dim_col == "*" { "COUNT(*)".into() }
218 else if func == "COUNT" || func == "UNIQ" { format!("COUNT(DISTINCT `{dim_col}`)") }
219 else { format!("{func}(`{dim_col}`)") };
220 let h = parse_select_where_from_value(obj, &agg_expr)?;
221 if !h.is_empty() {
222 having = if having.is_empty() { h } else { FilterNode::And(vec![having, h]) };
223 }
224 }
225 }
226 }
227 }
228
229 for q in quantiles {
230 let dim_col = flat.iter()
231 .find(|(path, _)| path == &q.of_dimension)
232 .map(|(_, dim)| dim.column.clone())
233 .unwrap_or_else(|| "*".to_string());
234 let alias = metric_key(&q.alias);
235 let expr = format!("quantile({})(`{}`)", q.level, dim_col);
236 selects.push(SelectExpr::Column { column: expr, alias: Some(alias) });
237 if group_by.is_empty() && !selects.iter().any(|s| matches!(s, SelectExpr::Aggregate { .. })) {
238 group_by = selects.iter().filter_map(|s| match s {
239 SelectExpr::Column { column, alias } if alias.is_none() && !column.contains('(') => Some(column.clone()),
240 _ => None,
241 }).collect();
242 }
243 }
244
245 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
247
248 for calc in calculates {
249 let alias = metric_key(&calc.alias);
250 let resolved = resolve_calculate_expr(&calc.expression, &allowed_keys);
251 selects.push(SelectExpr::Column {
252 column: format!("ifNotFinite(({resolved}), 0)"),
253 alias: Some(alias),
254 });
255 }
256
257 ensure_having_columns_in_selects(&having, &mut selects);
258
259 let allowed_keys = collect_select_keys(&selects, &flat, field_aliases, dim_aggs, time_intervals);
260 let order_by = parse_order_by(args, cube, &allowed_keys)?;
261
262 if !group_by.is_empty() && !order_by.is_empty() {
267 let group_set: HashSet<&str> = group_by.iter().map(|s| s.as_str()).collect();
268 let select_exprs: HashSet<&str> = selects.iter().map(|s| match s {
269 SelectExpr::Column { column, .. } => column.as_str(),
270 SelectExpr::Aggregate { alias, .. } => alias.as_str(),
271 SelectExpr::DimAggregate { alias, .. } => alias.as_str(),
272 }).collect();
273 for o in &order_by {
274 let col = o.column.as_str();
275 let is_in_group = group_set.contains(col);
276 let is_aggregate = col.contains('(') || select_exprs.contains(col);
277 if !is_in_group && !is_aggregate {
278 let field_name = flat.iter()
279 .find(|(_, dim)| dim.column == col)
280 .map(|(path, _)| path.as_str())
281 .unwrap_or(col);
282 return Err(async_graphql::Error::new(format!(
283 "Cannot order by '{}' in aggregation query — add the field to your selection or order by an aggregated metric instead.",
284 field_name,
285 )));
286 }
287 }
288 }
289
290 let limit_by = parse_limit_by(args, cube)?;
291
292 let from_subquery = cube.from_subquery.as_ref().map(|s| {
293 s.replace("{schema}", &schema).replace("{chain}", network)
294 });
295
296 Ok(QueryIR {
297 cube: cube.name.clone(),
298 schema,
299 table,
300 selects,
301 filters,
302 having,
303 group_by,
304 order_by,
305 limit,
306 offset,
307 limit_by,
308 use_final: cube.use_final,
309 joins: Vec::new(),
310 custom_query_builder: cube.custom_query_builder.clone(),
311 from_subquery,
312 })
313}
314
315fn parse_select_where_from_value(
318 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
319 aggregate_expr: &str,
320) -> Result<FilterNode, async_graphql::Error> {
321 let mut conditions = Vec::new();
322
323 for (key, op) in &[
324 ("eq", CompareOp::Eq),
325 ("ne", CompareOp::Ne),
326 ("gt", CompareOp::Gt),
327 ("ge", CompareOp::Ge),
328 ("lt", CompareOp::Lt),
329 ("le", CompareOp::Le),
330 ] {
331 if let Some(val) = obj.get(*key) {
332 let sql_val = match val {
333 async_graphql::Value::String(s) => {
334 if let Ok(f) = s.parse::<f64>() {
335 SqlValue::Float(f)
336 } else {
337 SqlValue::String(s.clone())
338 }
339 }
340 async_graphql::Value::Number(n) => {
341 if let Some(f) = n.as_f64() {
342 SqlValue::Float(f)
343 } else {
344 SqlValue::Int(n.as_i64().unwrap_or(0))
345 }
346 }
347 _ => continue,
348 };
349 conditions.push(FilterNode::Condition {
350 column: aggregate_expr.to_string(),
351 op: op.clone(),
352 value: sql_val,
353 });
354 }
355 }
356
357 Ok(match conditions.len() {
358 0 => FilterNode::Empty,
359 1 => conditions.into_iter().next().unwrap(),
360 _ => FilterNode::And(conditions),
361 })
362}
363
364fn merge_selector_filters(
365 base: FilterNode,
366 args: &ObjectAccessor,
367 selectors: &[SelectorDef],
368) -> Result<FilterNode, async_graphql::Error> {
369 let mut extra = Vec::new();
370
371 for sel in selectors {
372 if let Ok(val) = args.try_get(&sel.graphql_name) {
373 if let Ok(obj) = val.object() {
374 let leaf_filters =
375 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
376 extra.extend(leaf_filters);
377 }
378 }
379 }
380
381 if extra.is_empty() {
382 return Ok(base);
383 }
384 if base.is_empty() {
385 return Ok(if extra.len() == 1 {
386 extra.remove(0)
387 } else {
388 FilterNode::And(extra)
389 });
390 }
391 extra.push(base);
392 Ok(FilterNode::And(extra))
393}
394
395fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
396 if defaults.is_empty() {
397 return user_filters;
398 }
399
400 let mut default_nodes: Vec<FilterNode> = defaults
401 .iter()
402 .map(|(col, val)| {
403 let sql_val = if val == "true" || val == "false" {
404 SqlValue::Bool(val == "true")
405 } else if let Ok(n) = val.parse::<i64>() {
406 SqlValue::Int(n)
407 } else {
408 SqlValue::String(val.clone())
409 };
410 FilterNode::Condition {
411 column: col.clone(),
412 op: CompareOp::Eq,
413 value: sql_val,
414 }
415 })
416 .collect();
417
418 if user_filters.is_empty() {
419 if default_nodes.len() == 1 {
420 return default_nodes.remove(0);
421 }
422 return FilterNode::And(default_nodes);
423 }
424
425 default_nodes.push(user_filters);
426 FilterNode::And(default_nodes)
427}
428
429fn parse_limit(
430 args: &ObjectAccessor,
431 default: u32,
432 max: u32,
433) -> Result<(u32, u32), async_graphql::Error> {
434 let mut limit = default;
435 let mut offset = 0u32;
436
437 if let Ok(limit_val) = args.try_get("limit") {
438 if let Ok(limit_obj) = limit_val.object() {
439 if let Ok(count) = limit_obj.try_get("count") {
440 limit = (count.i64()? as u32).min(max);
441 }
442 if let Ok(off) = limit_obj.try_get("offset") {
443 offset = off.i64()? as u32;
444 }
445 }
446 }
447
448 Ok((limit, offset))
449}
450
451fn parse_order_by(
452 args: &ObjectAccessor,
453 cube: &CubeDefinition,
454 allowed_keys: &HashMap<String, String>,
455) -> Result<Vec<OrderExpr>, async_graphql::Error> {
456 let order_val = match args.try_get("orderBy") {
457 Ok(v) => v,
458 Err(_) => return Ok(Vec::new()),
459 };
460
461 let obj = order_val.object()
462 .map_err(|_| async_graphql::Error::new("orderBy must be an object"))?;
463 let flat = cube.flat_dimensions();
464
465 if let Ok(field) = obj.try_get("descending") {
466 let path = field.enum_name()
467 .map_err(|_| async_graphql::Error::new("orderBy.descending must be an enum value"))?;
468 let column = flat.iter()
469 .find(|(p, _)| p == path)
470 .map(|(_, dim)| dim.column.clone())
471 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
472 return Ok(vec![OrderExpr { column, descending: true }]);
473 }
474
475 if let Ok(field) = obj.try_get("ascending") {
476 let path = field.enum_name()
477 .map_err(|_| async_graphql::Error::new("orderBy.ascending must be an enum value"))?;
478 let column = flat.iter()
479 .find(|(p, _)| p == path)
480 .map(|(_, dim)| dim.column.clone())
481 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {path}")))?;
482 return Ok(vec![OrderExpr { column, descending: false }]);
483 }
484
485 if let Ok(field_str) = obj.try_get("descendingByField") {
486 let name = field_str.string()
487 .map_err(|_| async_graphql::Error::new("descendingByField must be a string"))?;
488 let column = resolve_field_in_keys(name, allowed_keys)?;
489 return Ok(vec![OrderExpr { column, descending: true }]);
490 }
491
492 if let Ok(field_str) = obj.try_get("ascendingByField") {
493 let name = field_str.string()
494 .map_err(|_| async_graphql::Error::new("ascendingByField must be a string"))?;
495 let column = resolve_field_in_keys(name, allowed_keys)?;
496 return Ok(vec![OrderExpr { column, descending: false }]);
497 }
498
499 Ok(vec![])
500}
501
502fn resolve_field_in_keys(
505 name: &str,
506 allowed_keys: &HashMap<String, String>,
507) -> Result<String, async_graphql::Error> {
508 if let Some(expr) = allowed_keys.get(name) { return Ok(expr.clone()); }
509 Err(async_graphql::Error::new(format!(
510 "Can't use '{name}' in sorting/ordering. Field not found in executed query."
511 )))
512}
513
514fn collect_select_keys(
521 selects: &[SelectExpr],
522 flat: &[(String, crate::cube::definition::Dimension)],
523 field_aliases: &FieldAliasMap,
524 dim_aggs: &[DimAggRequest],
525 time_intervals: &[TimeIntervalRequest],
526) -> HashMap<String, String> {
527 let mut keys = HashMap::new();
528 for sel in selects {
529 match sel {
530 SelectExpr::Column { column, alias: Some(a) } => {
531 keys.insert(a.clone(), column.clone());
532 if let Some(name) = a.strip_prefix("__da_") {
533 keys.insert(name.to_string(), column.clone());
534 } else if let Some(name) = a.strip_prefix("__") {
535 keys.insert(name.to_string(), column.clone());
536 }
537 }
538 SelectExpr::Column { column, alias: None } => {
539 if let Some((path, _)) = flat.iter().find(|(_, d)| d.column == *column) {
540 keys.insert(path.clone(), column.clone());
541 }
542 keys.insert(column.clone(), column.clone());
543 }
544 SelectExpr::Aggregate { alias, function, column, .. } => {
545 let expr = format_agg_sql(function, column);
546 keys.insert(alias.clone(), expr.clone());
547 if let Some(name) = alias.strip_prefix("__") {
548 keys.insert(name.to_string(), expr);
549 }
550 }
551 SelectExpr::DimAggregate { alias, agg_type, value_column, compare_column, .. } => {
552 let expr = format_dim_agg_sql(agg_type, value_column, compare_column);
553 keys.insert(alias.clone(), expr.clone());
554 if let Some(name) = alias.strip_prefix("__da_") {
555 keys.insert(name.to_string(), expr);
556 }
557 }
558 }
559 }
560 for da in dim_aggs {
562 let suffix = match da.agg_type { DimAggType::ArgMax => "maximum", DimAggType::ArgMin => "minimum" };
563 let expr = format_dim_agg_sql(&da.agg_type, &da.value_column, &da.compare_column);
564 keys.entry(format!("{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
566 keys.entry(format!("{}_{suffix}", da.field_path)).or_insert_with(|| expr.clone());
568 if let Some(i) = da.field_path.rfind('_') {
569 let parent = &da.field_path[..i];
570 keys.entry(format!("{parent}_{}_{suffix}", da.graphql_alias)).or_insert_with(|| expr.clone());
572 keys.entry(format!("{parent}_{}", da.graphql_alias)).or_insert_with(|| expr.clone());
574 }
575 }
576 for ti in time_intervals {
578 let expr = time_interval_sql(&ti.column, &ti.unit, ti.count);
579 if let Some(i) = ti.field_path.rfind('_') {
580 let parent = &ti.field_path[..i];
581 keys.entry(format!("{parent}_{}", ti.graphql_alias)).or_insert_with(|| expr);
582 }
583 }
584 for (alias_path, column) in field_aliases {
585 keys.entry(alias_path.clone()).or_insert_with(|| format!("`{column}`"));
586 }
587 keys
588}
589
590fn format_agg_sql(function: &str, column: &str) -> String {
591 let func = function.to_uppercase();
592 let qcol = if column.contains('(') { column.to_string() } else { format!("`{column}`") };
593 match (func.as_str(), column) {
594 ("COUNT", "*") => "count()".to_string(),
595 ("UNIQ", _) => format!("uniq({qcol})"),
596 (f, _) => format!("{}({qcol})", f.to_lowercase()),
597 }
598}
599
600fn format_dim_agg_sql(agg_type: &DimAggType, value_column: &str, compare_column: &str) -> String {
601 let func = match agg_type { DimAggType::ArgMax => "argMax", DimAggType::ArgMin => "argMin" };
602 let qval = if value_column.contains('(') { value_column.to_string() } else { format!("`{value_column}`") };
603 let qcmp = if compare_column.contains('(') { compare_column.to_string() } else { format!("`{compare_column}`") };
604 format!("{func}({qval}, {qcmp})")
605}
606
607fn resolve_calculate_expr(expression: &str, allowed_keys: &HashMap<String, String>) -> String {
609 let mut result = String::new();
610 let mut chars = expression.chars().peekable();
611 while let Some(ch) = chars.next() {
612 if ch == '$' {
613 let mut var_name = String::new();
614 while let Some(&c) = chars.peek() {
615 if c.is_alphanumeric() || c == '_' {
616 var_name.push(c);
617 chars.next();
618 } else {
619 break;
620 }
621 }
622 if !var_name.is_empty() {
623 if let Some(resolved) = allowed_keys.get(&var_name) {
624 let col_ref = if resolved.contains('(') { resolved.clone() } else { format!("`{resolved}`") };
625 result.push_str(&format!("toFloat64({col_ref})"));
626 } else {
627 result.push_str(&format!("toFloat64(`{}`)", metric_key(&var_name)));
628 }
629 } else {
630 result.push('$');
631 }
632 } else {
633 result.push(ch);
634 }
635 }
636 result
637}
638
639fn time_interval_sql(column: &str, unit: &str, count: i64) -> String {
640 let unit_sql = match unit {
641 "seconds" => "SECOND", "minutes" => "MINUTE", "hours" => "HOUR",
642 "days" => "DAY", "weeks" => "WEEK", "months" => "MONTH", _ => "MINUTE",
643 };
644 format!("toStartOfInterval(`{column}`, INTERVAL {count} {unit_sql})")
645}
646
647fn compile_filter_inline(node: &FilterNode) -> String {
650 match node {
651 FilterNode::Empty => String::new(),
652 FilterNode::Condition { column, op, value } => {
653 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
654 if op.is_unary() {
655 return format!("{col} {}", op.sql_op());
656 }
657 let val_str = match value {
658 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
659 SqlValue::Int(i) => i.to_string(),
660 SqlValue::Float(f) => f.to_string(),
661 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
662 SqlValue::Expression(e) => e.clone(),
663 };
664 match op {
665 CompareOp::In | CompareOp::NotIn => {
666 if let SqlValue::String(csv) = value {
667 let items: Vec<String> = csv.split(',')
668 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
669 .collect();
670 format!("{col} {} ({})", op.sql_op(), items.join(", "))
671 } else {
672 format!("{col} {} ({val_str})", op.sql_op())
673 }
674 }
675 CompareOp::Includes => {
676 let like_val = match value {
677 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
678 _ => val_str,
679 };
680 format!("{col} LIKE {like_val}")
681 }
682 _ => format!("{col} {} {val_str}", op.sql_op()),
683 }
684 }
685 FilterNode::And(children) => {
686 let parts: Vec<String> = children.iter()
687 .map(compile_filter_inline)
688 .filter(|s| !s.is_empty())
689 .collect();
690 match parts.len() {
691 0 => String::new(),
692 1 => parts.into_iter().next().unwrap(),
693 _ => format!("({})", parts.join(" AND ")),
694 }
695 }
696 FilterNode::Or(children) => {
697 let parts: Vec<String> = children.iter()
698 .map(compile_filter_inline)
699 .filter(|s| !s.is_empty())
700 .collect();
701 match parts.len() {
702 0 => String::new(),
703 1 => parts.into_iter().next().unwrap(),
704 _ => format!("({})", parts.join(" OR ")),
705 }
706 }
707 FilterNode::ArrayIncludes { .. } => {
708 String::new()
711 }
712 }
713}
714
715fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
718 let cols = collect_having_columns(having);
719 for col in cols {
720 if !col.contains('(') {
721 continue;
722 }
723 let already_present = selects.iter().any(|s| match s {
724 SelectExpr::Column { column, .. } => column == &col,
725 _ => false,
726 });
727 if !already_present {
728 selects.push(SelectExpr::Column {
729 column: col,
730 alias: None,
731 });
732 }
733 }
734}
735
736fn collect_having_columns(node: &FilterNode) -> Vec<String> {
737 match node {
738 FilterNode::Empty => vec![],
739 FilterNode::Condition { column, .. } => vec![column.clone()],
740 FilterNode::And(children) | FilterNode::Or(children) => {
741 children.iter().flat_map(collect_having_columns).collect()
742 }
743 FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
744 }
745}
746
747fn is_aggregate_column(column: &str) -> bool {
750 column.contains('(')
751}
752
753fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
756 match node {
757 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
758 FilterNode::Condition { ref column, .. } => {
759 if is_aggregate_column(column) {
760 (FilterNode::Empty, node)
761 } else {
762 (node, FilterNode::Empty)
763 }
764 }
765 FilterNode::And(children) => {
766 let mut where_parts = Vec::new();
767 let mut having_parts = Vec::new();
768 for child in children {
769 let (w, h) = split_aggregate_filters(child);
770 if !w.is_empty() { where_parts.push(w); }
771 if !h.is_empty() { having_parts.push(h); }
772 }
773 let where_node = match where_parts.len() {
774 0 => FilterNode::Empty,
775 1 => where_parts.into_iter().next().unwrap(),
776 _ => FilterNode::And(where_parts),
777 };
778 let having_node = match having_parts.len() {
779 0 => FilterNode::Empty,
780 1 => having_parts.into_iter().next().unwrap(),
781 _ => FilterNode::And(having_parts),
782 };
783 (where_node, having_node)
784 }
785 FilterNode::Or(children) => {
786 let any_aggregate = children.iter().any(filter_has_aggregate);
787 if any_aggregate {
788 (FilterNode::Empty, FilterNode::Or(children))
789 } else {
790 (FilterNode::Or(children), FilterNode::Empty)
791 }
792 }
793 FilterNode::ArrayIncludes { .. } => {
794 (node, FilterNode::Empty)
796 }
797 }
798}
799
800fn filter_has_aggregate(node: &FilterNode) -> bool {
801 match node {
802 FilterNode::Empty => false,
803 FilterNode::Condition { column, .. } => is_aggregate_column(column),
804 FilterNode::And(children) | FilterNode::Or(children) => {
805 children.iter().any(filter_has_aggregate)
806 }
807 FilterNode::ArrayIncludes { .. } => false,
808 }
809}
810
811fn parse_limit_by(
812 args: &ObjectAccessor,
813 cube: &CubeDefinition,
814) -> Result<Option<LimitByExpr>, async_graphql::Error> {
815 let lb_val = match args.try_get("limitBy") {
816 Ok(v) => v,
817 Err(_) => return Ok(None),
818 };
819 let lb_obj = lb_val.object()?;
820 let count = lb_obj.try_get("count")?.i64()? as u32;
821 let offset = lb_obj
822 .try_get("offset")
823 .ok()
824 .and_then(|v| v.i64().ok())
825 .unwrap_or(0) as u32;
826 let by_val = lb_obj.try_get("by")?;
827 let by_str = by_val.enum_name()?;
828
829 let flat = cube.flat_dimensions();
830 let column = flat.iter()
831 .find(|(path, _)| path == by_str)
832 .map(|(_, dim)| dim.column.clone())
833 .ok_or_else(|| async_graphql::Error::new(
834 format!("Unknown limitBy field: {by_str}")
835 ))?;
836
837 Ok(Some(LimitByExpr { count, offset, columns: vec![column] }))
838}