1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9pub struct MetricRequest {
11 pub function: String,
12 pub of_dimension: String,
13 pub select_where_value: Option<async_graphql::Value>,
15 pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20 cube: &CubeDefinition,
21 network: &str,
22 args: &ObjectAccessor,
23 metrics: &[MetricRequest],
24 requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26 let flat = cube.flat_dimensions();
27 let requested_cols: Vec<String> = flat.iter()
28 .filter(|(path, _)| {
29 requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
30 })
31 .map(|(_, dim)| dim.column.clone())
32 .collect();
33 let (schema, table) = cube.resolve_table(network, &requested_cols);
34
35 let filters = if let Ok(where_val) = args.try_get("where") {
36 if let Ok(where_obj) = where_val.object() {
37 filter::parse_where(&where_obj, &cube.dimensions)?
38 } else {
39 FilterNode::Empty
40 }
41 } else {
42 FilterNode::Empty
43 };
44
45 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
46 let filters = if let Some(ref chain_col) = cube.chain_column {
49 let chain_filter = FilterNode::Condition {
50 column: chain_col.clone(),
51 op: CompareOp::Eq,
52 value: SqlValue::String(network.to_string()),
53 };
54 if filters.is_empty() {
55 chain_filter
56 } else {
57 FilterNode::And(vec![chain_filter, filters])
58 }
59 } else {
60 filters
61 };
62 let filters = apply_default_filters(filters, &cube.default_filters);
63 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
64 let order_by = parse_order_by(args, cube)?;
65
66 let mut selects: Vec<SelectExpr> = flat
67 .iter()
68 .filter(|(path, _)| {
69 requested_fields
70 .as_ref()
71 .is_none_or(|rf| rf.contains(path))
72 })
73 .map(|(_, dim)| SelectExpr::Column {
74 column: dim.column.clone(),
75 alias: None,
76 })
77 .collect();
78
79 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
83 selects = flat
84 .iter()
85 .map(|(_, dim)| SelectExpr::Column {
86 column: dim.column.clone(),
87 alias: None,
88 })
89 .collect();
90 }
91
92 let (filters, agg_having) = split_aggregate_filters(filters);
95
96 let mut group_by = Vec::new();
97 let mut having = agg_having;
98
99 if !metrics.is_empty() {
100 group_by = selects
101 .iter()
102 .filter_map(|s| match s {
103 SelectExpr::Column { column, .. } => Some(column.clone()),
104 _ => None,
105 })
106 .collect();
107
108 for m in metrics {
109 let dim_col = flat
110 .iter()
111 .find(|(path, _)| path == &m.of_dimension)
112 .map(|(_, dim)| dim.column.clone())
113 .unwrap_or_else(|| "*".to_string());
114
115 let alias = format!("__{}", m.function);
116 let metric_def = cube.find_metric(&m.function);
117
118 if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
119 let tmpl = md.expression_template.as_ref().unwrap();
120 let expanded = tmpl.replace("{column}", &dim_col);
121 selects.push(SelectExpr::Column {
122 column: expanded,
123 alias: Some(alias),
124 });
125 } else {
126 let func = m.function.to_uppercase();
127 let condition = m.condition_filter.as_ref().and_then(|f| {
128 let sql = compile_filter_inline(f);
129 if sql.is_empty() { None } else { Some(sql) }
130 });
131
132 selects.push(SelectExpr::Aggregate {
133 function: func.clone(),
134 column: dim_col.clone(),
135 alias: alias.clone(),
136 condition,
137 });
138
139 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
140 let agg_expr = if func == "COUNT" && dim_col == "*" {
141 "COUNT(*)".to_string()
142 } else if func == "UNIQ" {
143 format!("COUNT(DISTINCT `{dim_col}`)")
144 } else {
145 format!("{func}(`{dim_col}`)")
146 };
147
148 let h = parse_select_where_from_value(obj, &agg_expr)?;
149 if !h.is_empty() {
150 having = if having.is_empty() {
151 h
152 } else {
153 FilterNode::And(vec![having, h])
154 };
155 }
156 }
157 }
158 }
159 }
160
161 ensure_having_columns_in_selects(&having, &mut selects);
164
165 let limit_by = parse_limit_by(args, cube)?;
166
167 Ok(QueryIR {
168 cube: cube.name.clone(),
169 schema,
170 table,
171 selects,
172 filters,
173 having,
174 group_by,
175 order_by,
176 limit,
177 offset,
178 limit_by,
179 use_final: cube.use_final,
180 joins: Vec::new(),
181 custom_query_builder: cube.custom_query_builder.clone(),
182 })
183}
184
185fn parse_select_where_from_value(
188 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
189 aggregate_expr: &str,
190) -> Result<FilterNode, async_graphql::Error> {
191 let mut conditions = Vec::new();
192
193 for (key, op) in &[
194 ("eq", CompareOp::Eq),
195 ("gt", CompareOp::Gt),
196 ("ge", CompareOp::Ge),
197 ("lt", CompareOp::Lt),
198 ("le", CompareOp::Le),
199 ] {
200 if let Some(val) = obj.get(*key) {
201 let sql_val = match val {
202 async_graphql::Value::String(s) => {
203 if let Ok(f) = s.parse::<f64>() {
204 SqlValue::Float(f)
205 } else {
206 SqlValue::String(s.clone())
207 }
208 }
209 async_graphql::Value::Number(n) => {
210 if let Some(f) = n.as_f64() {
211 SqlValue::Float(f)
212 } else {
213 SqlValue::Int(n.as_i64().unwrap_or(0))
214 }
215 }
216 _ => continue,
217 };
218 conditions.push(FilterNode::Condition {
219 column: aggregate_expr.to_string(),
220 op: op.clone(),
221 value: sql_val,
222 });
223 }
224 }
225
226 Ok(match conditions.len() {
227 0 => FilterNode::Empty,
228 1 => conditions.into_iter().next().unwrap(),
229 _ => FilterNode::And(conditions),
230 })
231}
232
233fn merge_selector_filters(
234 base: FilterNode,
235 args: &ObjectAccessor,
236 selectors: &[SelectorDef],
237) -> Result<FilterNode, async_graphql::Error> {
238 let mut extra = Vec::new();
239
240 for sel in selectors {
241 if let Ok(val) = args.try_get(&sel.graphql_name) {
242 if let Ok(obj) = val.object() {
243 let leaf_filters =
244 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
245 extra.extend(leaf_filters);
246 }
247 }
248 }
249
250 if extra.is_empty() {
251 return Ok(base);
252 }
253 if base.is_empty() {
254 return Ok(if extra.len() == 1 {
255 extra.remove(0)
256 } else {
257 FilterNode::And(extra)
258 });
259 }
260 extra.push(base);
261 Ok(FilterNode::And(extra))
262}
263
264fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
265 if defaults.is_empty() {
266 return user_filters;
267 }
268
269 let mut default_nodes: Vec<FilterNode> = defaults
270 .iter()
271 .map(|(col, val)| {
272 let sql_val = if val == "true" || val == "false" {
273 SqlValue::Bool(val == "true")
274 } else if let Ok(n) = val.parse::<i64>() {
275 SqlValue::Int(n)
276 } else {
277 SqlValue::String(val.clone())
278 };
279 FilterNode::Condition {
280 column: col.clone(),
281 op: CompareOp::Eq,
282 value: sql_val,
283 }
284 })
285 .collect();
286
287 if user_filters.is_empty() {
288 if default_nodes.len() == 1 {
289 return default_nodes.remove(0);
290 }
291 return FilterNode::And(default_nodes);
292 }
293
294 default_nodes.push(user_filters);
295 FilterNode::And(default_nodes)
296}
297
298fn parse_limit(
299 args: &ObjectAccessor,
300 default: u32,
301 max: u32,
302) -> Result<(u32, u32), async_graphql::Error> {
303 let mut limit = default;
304 let mut offset = 0u32;
305
306 if let Ok(limit_val) = args.try_get("limit") {
307 if let Ok(limit_obj) = limit_val.object() {
308 if let Ok(count) = limit_obj.try_get("count") {
309 limit = (count.i64()? as u32).min(max);
310 }
311 if let Ok(off) = limit_obj.try_get("offset") {
312 offset = off.i64()? as u32;
313 }
314 }
315 }
316
317 Ok((limit, offset))
318}
319
320fn parse_order_by(
321 args: &ObjectAccessor,
322 cube: &CubeDefinition,
323) -> Result<Vec<OrderExpr>, async_graphql::Error> {
324 let flat = cube.flat_dimensions();
325
326 if let Ok(list_val) = args.try_get("orderByList") {
327 if let Ok(list) = list_val.list() {
328 let mut orders = Vec::new();
329 for item in list.iter() {
330 let obj = item.object()
331 .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
332 let field_accessor = obj.try_get("field")
333 .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
334 let field_str = field_accessor.enum_name()
335 .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
336 let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
337 dir_accessor.enum_name() == Ok("DESC")
338 } else {
339 false
340 };
341 let column = flat.iter()
342 .find(|(p, _)| p == field_str)
343 .map(|(_, dim)| dim.column.clone())
344 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
345 orders.push(OrderExpr { column, descending });
346 }
347 if !orders.is_empty() {
348 return Ok(orders);
349 }
350 }
351 }
352
353 let order_val = match args.try_get("orderBy") {
354 Ok(v) => v,
355 Err(_) => return Ok(Vec::new()),
356 };
357
358 let enum_str = order_val
359 .enum_name()
360 .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
361
362 let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
363 (true, path)
364 } else if let Some(path) = enum_str.strip_suffix("_ASC") {
365 (false, path)
366 } else {
367 return Err(async_graphql::Error::new(format!(
368 "Invalid orderBy value: {enum_str}"
369 )));
370 };
371
372 let column = flat
373 .iter()
374 .find(|(p, _)| p == field_path)
375 .map(|(_, dim)| dim.column.clone())
376 .ok_or_else(|| {
377 async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
378 })?;
379
380 Ok(vec![OrderExpr { column, descending }])
381}
382
383fn compile_filter_inline(node: &FilterNode) -> String {
386 match node {
387 FilterNode::Empty => String::new(),
388 FilterNode::Condition { column, op, value } => {
389 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
390 if op.is_unary() {
391 return format!("{col} {}", op.sql_op());
392 }
393 let val_str = match value {
394 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
395 SqlValue::Int(i) => i.to_string(),
396 SqlValue::Float(f) => f.to_string(),
397 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
398 };
399 match op {
400 CompareOp::In | CompareOp::NotIn => {
401 if let SqlValue::String(csv) = value {
402 let items: Vec<String> = csv.split(',')
403 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
404 .collect();
405 format!("{col} {} ({})", op.sql_op(), items.join(", "))
406 } else {
407 format!("{col} {} ({val_str})", op.sql_op())
408 }
409 }
410 CompareOp::Includes => {
411 let like_val = match value {
412 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
413 _ => val_str,
414 };
415 format!("{col} LIKE {like_val}")
416 }
417 _ => format!("{col} {} {val_str}", op.sql_op()),
418 }
419 }
420 FilterNode::And(children) => {
421 let parts: Vec<String> = children.iter()
422 .map(compile_filter_inline)
423 .filter(|s| !s.is_empty())
424 .collect();
425 match parts.len() {
426 0 => String::new(),
427 1 => parts.into_iter().next().unwrap(),
428 _ => format!("({})", parts.join(" AND ")),
429 }
430 }
431 FilterNode::Or(children) => {
432 let parts: Vec<String> = children.iter()
433 .map(compile_filter_inline)
434 .filter(|s| !s.is_empty())
435 .collect();
436 match parts.len() {
437 0 => String::new(),
438 1 => parts.into_iter().next().unwrap(),
439 _ => format!("({})", parts.join(" OR ")),
440 }
441 }
442 }
443}
444
445fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
448 let cols = collect_having_columns(having);
449 for col in cols {
450 if !col.contains('(') {
451 continue;
452 }
453 let already_present = selects.iter().any(|s| match s {
454 SelectExpr::Column { column, .. } => column == &col,
455 _ => false,
456 });
457 if !already_present {
458 selects.push(SelectExpr::Column {
459 column: col,
460 alias: None,
461 });
462 }
463 }
464}
465
466fn collect_having_columns(node: &FilterNode) -> Vec<String> {
467 match node {
468 FilterNode::Empty => vec![],
469 FilterNode::Condition { column, .. } => vec![column.clone()],
470 FilterNode::And(children) | FilterNode::Or(children) => {
471 children.iter().flat_map(collect_having_columns).collect()
472 }
473 }
474}
475
476fn is_aggregate_column(column: &str) -> bool {
479 column.contains('(')
480}
481
482fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
485 match node {
486 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
487 FilterNode::Condition { ref column, .. } => {
488 if is_aggregate_column(column) {
489 (FilterNode::Empty, node)
490 } else {
491 (node, FilterNode::Empty)
492 }
493 }
494 FilterNode::And(children) => {
495 let mut where_parts = Vec::new();
496 let mut having_parts = Vec::new();
497 for child in children {
498 let (w, h) = split_aggregate_filters(child);
499 if !w.is_empty() { where_parts.push(w); }
500 if !h.is_empty() { having_parts.push(h); }
501 }
502 let where_node = match where_parts.len() {
503 0 => FilterNode::Empty,
504 1 => where_parts.into_iter().next().unwrap(),
505 _ => FilterNode::And(where_parts),
506 };
507 let having_node = match having_parts.len() {
508 0 => FilterNode::Empty,
509 1 => having_parts.into_iter().next().unwrap(),
510 _ => FilterNode::And(having_parts),
511 };
512 (where_node, having_node)
513 }
514 FilterNode::Or(children) => {
515 let any_aggregate = children.iter().any(filter_has_aggregate);
516 if any_aggregate {
517 (FilterNode::Empty, FilterNode::Or(children))
518 } else {
519 (FilterNode::Or(children), FilterNode::Empty)
520 }
521 }
522 }
523}
524
525fn filter_has_aggregate(node: &FilterNode) -> bool {
526 match node {
527 FilterNode::Empty => false,
528 FilterNode::Condition { column, .. } => is_aggregate_column(column),
529 FilterNode::And(children) | FilterNode::Or(children) => {
530 children.iter().any(filter_has_aggregate)
531 }
532 }
533}
534
535fn parse_limit_by(
536 args: &ObjectAccessor,
537 cube: &CubeDefinition,
538) -> Result<Option<LimitByExpr>, async_graphql::Error> {
539 let lb_val = match args.try_get("limitBy") {
540 Ok(v) => v,
541 Err(_) => return Ok(None),
542 };
543 let lb_obj = lb_val.object()?;
544 let count = lb_obj.try_get("count")?.i64()? as u32;
545 let offset = lb_obj
546 .try_get("offset")
547 .ok()
548 .and_then(|v| v.i64().ok())
549 .unwrap_or(0) as u32;
550 let by_str = lb_obj.try_get("by")?.string()?;
551
552 let flat = cube.flat_dimensions();
553 let columns: Vec<String> = by_str
554 .split(',')
555 .map(|s| {
556 let trimmed = s.trim();
557 flat.iter()
558 .find(|(path, _)| path == trimmed)
559 .map(|(_, dim)| dim.column.clone())
560 .unwrap_or_else(|| trimmed.to_string())
561 })
562 .collect();
563
564 if columns.is_empty() {
565 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
566 }
567
568 Ok(Some(LimitByExpr { count, offset, columns }))
569}