1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9pub struct MetricRequest {
11 pub function: String,
12 pub of_dimension: String,
13 pub select_where_value: Option<async_graphql::Value>,
15 pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20 cube: &CubeDefinition,
21 network: &str,
22 args: &ObjectAccessor,
23 metrics: &[MetricRequest],
24 requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26 let flat = cube.flat_dimensions();
27 let requested_cols: Vec<String> = flat.iter()
28 .filter(|(path, _)| {
29 requested_fields.as_ref().is_none_or(|rf| rf.contains(path))
30 })
31 .map(|(_, dim)| dim.column.clone())
32 .collect();
33 let (schema, table) = cube.resolve_table(network, &requested_cols);
34
35 let filters = if let Ok(where_val) = args.try_get("where") {
36 if let Ok(where_obj) = where_val.object() {
37 filter::parse_where(&where_obj, &cube.dimensions)?
38 } else {
39 FilterNode::Empty
40 }
41 } else {
42 FilterNode::Empty
43 };
44
45 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
46 let filters = if let Some(ref chain_col) = cube.chain_column {
49 let chain_filter = FilterNode::Condition {
50 column: chain_col.clone(),
51 op: CompareOp::Eq,
52 value: SqlValue::String(network.to_string()),
53 };
54 if filters.is_empty() {
55 chain_filter
56 } else {
57 FilterNode::And(vec![chain_filter, filters])
58 }
59 } else {
60 filters
61 };
62 let filters = apply_default_filters(filters, &cube.default_filters);
63 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
64 let order_by = parse_order_by(args, cube)?;
65
66 let mut selects: Vec<SelectExpr> = flat
67 .iter()
68 .filter(|(path, _)| {
69 requested_fields
70 .as_ref()
71 .is_none_or(|rf| rf.contains(path))
72 })
73 .map(|(_, dim)| SelectExpr::Column {
74 column: dim.column.clone(),
75 alias: None,
76 })
77 .collect();
78
79 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
83 selects = flat
84 .iter()
85 .map(|(_, dim)| SelectExpr::Column {
86 column: dim.column.clone(),
87 alias: None,
88 })
89 .collect();
90 }
91
92 let (filters, agg_having) = split_aggregate_filters(filters);
95
96 let mut group_by = Vec::new();
97 let mut having = agg_having;
98
99 if !metrics.is_empty() {
100 group_by = selects
101 .iter()
102 .filter_map(|s| match s {
103 SelectExpr::Column { column, .. } => Some(column.clone()),
104 _ => None,
105 })
106 .collect();
107
108 for m in metrics {
109 let dim_col = flat
110 .iter()
111 .find(|(path, _)| path == &m.of_dimension)
112 .map(|(_, dim)| dim.column.clone())
113 .unwrap_or_else(|| "*".to_string());
114
115 let alias = format!("__{}", m.function);
116 let metric_def = cube.find_metric(&m.function);
117
118 if let Some(md) = metric_def.filter(|md| md.expression_template.is_some()) {
119 let tmpl = md.expression_template.as_ref().unwrap();
120 let expanded = tmpl.replace("{column}", &dim_col);
121 selects.push(SelectExpr::Column {
122 column: expanded,
123 alias: Some(alias),
124 });
125 } else {
126 let func = m.function.to_uppercase();
127 let condition = m.condition_filter.as_ref().and_then(|f| {
128 let sql = compile_filter_inline(f);
129 if sql.is_empty() { None } else { Some(sql) }
130 });
131
132 selects.push(SelectExpr::Aggregate {
133 function: func.clone(),
134 column: dim_col.clone(),
135 alias: alias.clone(),
136 condition,
137 });
138
139 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
140 let agg_expr = if func == "COUNT" && dim_col == "*" {
141 "COUNT(*)".to_string()
142 } else if func == "UNIQ" {
143 format!("COUNT(DISTINCT `{dim_col}`)")
144 } else {
145 format!("{func}(`{dim_col}`)")
146 };
147
148 let h = parse_select_where_from_value(obj, &agg_expr)?;
149 if !h.is_empty() {
150 having = if having.is_empty() {
151 h
152 } else {
153 FilterNode::And(vec![having, h])
154 };
155 }
156 }
157 }
158 }
159 }
160
161 ensure_having_columns_in_selects(&having, &mut selects);
164
165 let limit_by = parse_limit_by(args, cube)?;
166
167 let from_subquery = cube.from_subquery.as_ref().map(|s| {
168 s.replace("{schema}", &schema).replace("{chain}", network)
169 });
170
171 Ok(QueryIR {
172 cube: cube.name.clone(),
173 schema,
174 table,
175 selects,
176 filters,
177 having,
178 group_by,
179 order_by,
180 limit,
181 offset,
182 limit_by,
183 use_final: cube.use_final,
184 joins: Vec::new(),
185 custom_query_builder: cube.custom_query_builder.clone(),
186 from_subquery,
187 })
188}
189
190fn parse_select_where_from_value(
193 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
194 aggregate_expr: &str,
195) -> Result<FilterNode, async_graphql::Error> {
196 let mut conditions = Vec::new();
197
198 for (key, op) in &[
199 ("eq", CompareOp::Eq),
200 ("gt", CompareOp::Gt),
201 ("ge", CompareOp::Ge),
202 ("lt", CompareOp::Lt),
203 ("le", CompareOp::Le),
204 ] {
205 if let Some(val) = obj.get(*key) {
206 let sql_val = match val {
207 async_graphql::Value::String(s) => {
208 if let Ok(f) = s.parse::<f64>() {
209 SqlValue::Float(f)
210 } else {
211 SqlValue::String(s.clone())
212 }
213 }
214 async_graphql::Value::Number(n) => {
215 if let Some(f) = n.as_f64() {
216 SqlValue::Float(f)
217 } else {
218 SqlValue::Int(n.as_i64().unwrap_or(0))
219 }
220 }
221 _ => continue,
222 };
223 conditions.push(FilterNode::Condition {
224 column: aggregate_expr.to_string(),
225 op: op.clone(),
226 value: sql_val,
227 });
228 }
229 }
230
231 Ok(match conditions.len() {
232 0 => FilterNode::Empty,
233 1 => conditions.into_iter().next().unwrap(),
234 _ => FilterNode::And(conditions),
235 })
236}
237
238fn merge_selector_filters(
239 base: FilterNode,
240 args: &ObjectAccessor,
241 selectors: &[SelectorDef],
242) -> Result<FilterNode, async_graphql::Error> {
243 let mut extra = Vec::new();
244
245 for sel in selectors {
246 if let Ok(val) = args.try_get(&sel.graphql_name) {
247 if let Ok(obj) = val.object() {
248 let leaf_filters =
249 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
250 extra.extend(leaf_filters);
251 }
252 }
253 }
254
255 if extra.is_empty() {
256 return Ok(base);
257 }
258 if base.is_empty() {
259 return Ok(if extra.len() == 1 {
260 extra.remove(0)
261 } else {
262 FilterNode::And(extra)
263 });
264 }
265 extra.push(base);
266 Ok(FilterNode::And(extra))
267}
268
269fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
270 if defaults.is_empty() {
271 return user_filters;
272 }
273
274 let mut default_nodes: Vec<FilterNode> = defaults
275 .iter()
276 .map(|(col, val)| {
277 let sql_val = if val == "true" || val == "false" {
278 SqlValue::Bool(val == "true")
279 } else if let Ok(n) = val.parse::<i64>() {
280 SqlValue::Int(n)
281 } else {
282 SqlValue::String(val.clone())
283 };
284 FilterNode::Condition {
285 column: col.clone(),
286 op: CompareOp::Eq,
287 value: sql_val,
288 }
289 })
290 .collect();
291
292 if user_filters.is_empty() {
293 if default_nodes.len() == 1 {
294 return default_nodes.remove(0);
295 }
296 return FilterNode::And(default_nodes);
297 }
298
299 default_nodes.push(user_filters);
300 FilterNode::And(default_nodes)
301}
302
303fn parse_limit(
304 args: &ObjectAccessor,
305 default: u32,
306 max: u32,
307) -> Result<(u32, u32), async_graphql::Error> {
308 let mut limit = default;
309 let mut offset = 0u32;
310
311 if let Ok(limit_val) = args.try_get("limit") {
312 if let Ok(limit_obj) = limit_val.object() {
313 if let Ok(count) = limit_obj.try_get("count") {
314 limit = (count.i64()? as u32).min(max);
315 }
316 if let Ok(off) = limit_obj.try_get("offset") {
317 offset = off.i64()? as u32;
318 }
319 }
320 }
321
322 Ok((limit, offset))
323}
324
325fn parse_order_by(
326 args: &ObjectAccessor,
327 cube: &CubeDefinition,
328) -> Result<Vec<OrderExpr>, async_graphql::Error> {
329 let flat = cube.flat_dimensions();
330
331 if let Ok(list_val) = args.try_get("orderByList") {
332 if let Ok(list) = list_val.list() {
333 let mut orders = Vec::new();
334 for item in list.iter() {
335 let obj = item.object()
336 .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
337 let field_accessor = obj.try_get("field")
338 .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
339 let field_str = field_accessor.enum_name()
340 .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
341 let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
342 dir_accessor.enum_name() == Ok("DESC")
343 } else {
344 false
345 };
346 let column = flat.iter()
347 .find(|(p, _)| p == field_str)
348 .map(|(_, dim)| dim.column.clone())
349 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
350 orders.push(OrderExpr { column, descending });
351 }
352 if !orders.is_empty() {
353 return Ok(orders);
354 }
355 }
356 }
357
358 let order_val = match args.try_get("orderBy") {
359 Ok(v) => v,
360 Err(_) => return Ok(Vec::new()),
361 };
362
363 let enum_str = order_val
364 .enum_name()
365 .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
366
367 let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
368 (true, path)
369 } else if let Some(path) = enum_str.strip_suffix("_ASC") {
370 (false, path)
371 } else {
372 return Err(async_graphql::Error::new(format!(
373 "Invalid orderBy value: {enum_str}"
374 )));
375 };
376
377 let column = flat
378 .iter()
379 .find(|(p, _)| p == field_path)
380 .map(|(_, dim)| dim.column.clone())
381 .ok_or_else(|| {
382 async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
383 })?;
384
385 Ok(vec![OrderExpr { column, descending }])
386}
387
388fn compile_filter_inline(node: &FilterNode) -> String {
391 match node {
392 FilterNode::Empty => String::new(),
393 FilterNode::Condition { column, op, value } => {
394 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
395 if op.is_unary() {
396 return format!("{col} {}", op.sql_op());
397 }
398 let val_str = match value {
399 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
400 SqlValue::Int(i) => i.to_string(),
401 SqlValue::Float(f) => f.to_string(),
402 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
403 };
404 match op {
405 CompareOp::In | CompareOp::NotIn => {
406 if let SqlValue::String(csv) = value {
407 let items: Vec<String> = csv.split(',')
408 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
409 .collect();
410 format!("{col} {} ({})", op.sql_op(), items.join(", "))
411 } else {
412 format!("{col} {} ({val_str})", op.sql_op())
413 }
414 }
415 CompareOp::Includes => {
416 let like_val = match value {
417 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
418 _ => val_str,
419 };
420 format!("{col} LIKE {like_val}")
421 }
422 _ => format!("{col} {} {val_str}", op.sql_op()),
423 }
424 }
425 FilterNode::And(children) => {
426 let parts: Vec<String> = children.iter()
427 .map(compile_filter_inline)
428 .filter(|s| !s.is_empty())
429 .collect();
430 match parts.len() {
431 0 => String::new(),
432 1 => parts.into_iter().next().unwrap(),
433 _ => format!("({})", parts.join(" AND ")),
434 }
435 }
436 FilterNode::Or(children) => {
437 let parts: Vec<String> = children.iter()
438 .map(compile_filter_inline)
439 .filter(|s| !s.is_empty())
440 .collect();
441 match parts.len() {
442 0 => String::new(),
443 1 => parts.into_iter().next().unwrap(),
444 _ => format!("({})", parts.join(" OR ")),
445 }
446 }
447 }
448}
449
450fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
453 let cols = collect_having_columns(having);
454 for col in cols {
455 if !col.contains('(') {
456 continue;
457 }
458 let already_present = selects.iter().any(|s| match s {
459 SelectExpr::Column { column, .. } => column == &col,
460 _ => false,
461 });
462 if !already_present {
463 selects.push(SelectExpr::Column {
464 column: col,
465 alias: None,
466 });
467 }
468 }
469}
470
471fn collect_having_columns(node: &FilterNode) -> Vec<String> {
472 match node {
473 FilterNode::Empty => vec![],
474 FilterNode::Condition { column, .. } => vec![column.clone()],
475 FilterNode::And(children) | FilterNode::Or(children) => {
476 children.iter().flat_map(collect_having_columns).collect()
477 }
478 }
479}
480
481fn is_aggregate_column(column: &str) -> bool {
484 column.contains('(')
485}
486
487fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
490 match node {
491 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
492 FilterNode::Condition { ref column, .. } => {
493 if is_aggregate_column(column) {
494 (FilterNode::Empty, node)
495 } else {
496 (node, FilterNode::Empty)
497 }
498 }
499 FilterNode::And(children) => {
500 let mut where_parts = Vec::new();
501 let mut having_parts = Vec::new();
502 for child in children {
503 let (w, h) = split_aggregate_filters(child);
504 if !w.is_empty() { where_parts.push(w); }
505 if !h.is_empty() { having_parts.push(h); }
506 }
507 let where_node = match where_parts.len() {
508 0 => FilterNode::Empty,
509 1 => where_parts.into_iter().next().unwrap(),
510 _ => FilterNode::And(where_parts),
511 };
512 let having_node = match having_parts.len() {
513 0 => FilterNode::Empty,
514 1 => having_parts.into_iter().next().unwrap(),
515 _ => FilterNode::And(having_parts),
516 };
517 (where_node, having_node)
518 }
519 FilterNode::Or(children) => {
520 let any_aggregate = children.iter().any(filter_has_aggregate);
521 if any_aggregate {
522 (FilterNode::Empty, FilterNode::Or(children))
523 } else {
524 (FilterNode::Or(children), FilterNode::Empty)
525 }
526 }
527 }
528}
529
530fn filter_has_aggregate(node: &FilterNode) -> bool {
531 match node {
532 FilterNode::Empty => false,
533 FilterNode::Condition { column, .. } => is_aggregate_column(column),
534 FilterNode::And(children) | FilterNode::Or(children) => {
535 children.iter().any(filter_has_aggregate)
536 }
537 }
538}
539
540fn parse_limit_by(
541 args: &ObjectAccessor,
542 cube: &CubeDefinition,
543) -> Result<Option<LimitByExpr>, async_graphql::Error> {
544 let lb_val = match args.try_get("limitBy") {
545 Ok(v) => v,
546 Err(_) => return Ok(None),
547 };
548 let lb_obj = lb_val.object()?;
549 let count = lb_obj.try_get("count")?.i64()? as u32;
550 let offset = lb_obj
551 .try_get("offset")
552 .ok()
553 .and_then(|v| v.i64().ok())
554 .unwrap_or(0) as u32;
555 let by_str = lb_obj.try_get("by")?.string()?;
556
557 let flat = cube.flat_dimensions();
558 let columns: Vec<String> = by_str
559 .split(',')
560 .map(|s| {
561 let trimmed = s.trim();
562 flat.iter()
563 .find(|(path, _)| path == trimmed)
564 .map(|(_, dim)| dim.column.clone())
565 .unwrap_or_else(|| trimmed.to_string())
566 })
567 .collect();
568
569 if columns.is_empty() {
570 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
571 }
572
573 Ok(Some(LimitByExpr { count, offset, columns }))
574}