1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9pub struct MetricRequest {
11 pub function: String,
12 pub of_dimension: String,
13 pub select_where_value: Option<async_graphql::Value>,
15 pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20 cube: &CubeDefinition,
21 network: &str,
22 args: &ObjectAccessor,
23 metrics: &[MetricRequest],
24 requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26 let table = cube.table_for_chain(network);
27
28 let filters = if let Ok(where_val) = args.try_get("where") {
29 if let Ok(where_obj) = where_val.object() {
30 filter::parse_where(&where_obj, &cube.dimensions)?
31 } else {
32 FilterNode::Empty
33 }
34 } else {
35 FilterNode::Empty
36 };
37
38 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
39 let filters = if let Some(ref chain_col) = cube.chain_column {
42 let chain_filter = FilterNode::Condition {
43 column: chain_col.clone(),
44 op: CompareOp::Eq,
45 value: SqlValue::String(network.to_string()),
46 };
47 if filters.is_empty() {
48 chain_filter
49 } else {
50 FilterNode::And(vec![chain_filter, filters])
51 }
52 } else {
53 filters
54 };
55 let filters = apply_default_filters(filters, &cube.default_filters);
56 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
57 let order_by = parse_order_by(args, cube)?;
58
59 let flat = cube.flat_dimensions();
60 let mut selects: Vec<SelectExpr> = flat
61 .iter()
62 .filter(|(path, _)| {
63 requested_fields
64 .as_ref()
65 .is_none_or(|rf| rf.contains(path))
66 })
67 .map(|(_, dim)| SelectExpr::Column {
68 column: dim.column.clone(),
69 alias: None,
70 })
71 .collect();
72
73 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
77 selects = flat
78 .iter()
79 .map(|(_, dim)| SelectExpr::Column {
80 column: dim.column.clone(),
81 alias: None,
82 })
83 .collect();
84 }
85
86 let (filters, agg_having) = split_aggregate_filters(filters);
89
90 let mut group_by = Vec::new();
91 let mut having = agg_having;
92
93 if !metrics.is_empty() {
94 group_by = selects
95 .iter()
96 .filter_map(|s| match s {
97 SelectExpr::Column { column, .. } => Some(column.clone()),
98 _ => None,
99 })
100 .collect();
101
102 for m in metrics {
103 let dim_col = flat
104 .iter()
105 .find(|(path, _)| path == &m.of_dimension)
106 .map(|(_, dim)| dim.column.clone())
107 .unwrap_or_else(|| "*".to_string());
108
109 let func = m.function.to_uppercase();
110 let alias = format!("__{}", m.function);
111
112 let condition = m.condition_filter.as_ref().and_then(|f| {
113 let sql = compile_filter_inline(f);
114 if sql.is_empty() { None } else { Some(sql) }
115 });
116
117 selects.push(SelectExpr::Aggregate {
118 function: func.clone(),
119 column: dim_col.clone(),
120 alias,
121 condition,
122 });
123
124 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
125 let agg_expr = if func == "COUNT" && dim_col == "*" {
126 "COUNT(*)".to_string()
127 } else if func == "UNIQ" {
128 format!("COUNT(DISTINCT `{dim_col}`)")
129 } else {
130 format!("{func}(`{dim_col}`)")
131 };
132
133 let h = parse_select_where_from_value(obj, &agg_expr)?;
134 if !h.is_empty() {
135 having = if having.is_empty() {
136 h
137 } else {
138 FilterNode::And(vec![having, h])
139 };
140 }
141 }
142 }
143 }
144
145 ensure_having_columns_in_selects(&having, &mut selects);
148
149 let limit_by = parse_limit_by(args, cube)?;
150
151 Ok(QueryIR {
152 cube: cube.name.clone(),
153 schema: cube.schema.clone(),
154 table,
155 selects,
156 filters,
157 having,
158 group_by,
159 order_by,
160 limit,
161 offset,
162 limit_by,
163 use_final: cube.use_final,
164 joins: Vec::new(),
165 })
166}
167
168fn parse_select_where_from_value(
171 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
172 aggregate_expr: &str,
173) -> Result<FilterNode, async_graphql::Error> {
174 let mut conditions = Vec::new();
175
176 for (key, op) in &[
177 ("eq", CompareOp::Eq),
178 ("gt", CompareOp::Gt),
179 ("ge", CompareOp::Ge),
180 ("lt", CompareOp::Lt),
181 ("le", CompareOp::Le),
182 ] {
183 if let Some(val) = obj.get(*key) {
184 let sql_val = match val {
185 async_graphql::Value::String(s) => {
186 if let Ok(f) = s.parse::<f64>() {
187 SqlValue::Float(f)
188 } else {
189 SqlValue::String(s.clone())
190 }
191 }
192 async_graphql::Value::Number(n) => {
193 if let Some(f) = n.as_f64() {
194 SqlValue::Float(f)
195 } else {
196 SqlValue::Int(n.as_i64().unwrap_or(0))
197 }
198 }
199 _ => continue,
200 };
201 conditions.push(FilterNode::Condition {
202 column: aggregate_expr.to_string(),
203 op: op.clone(),
204 value: sql_val,
205 });
206 }
207 }
208
209 Ok(match conditions.len() {
210 0 => FilterNode::Empty,
211 1 => conditions.into_iter().next().unwrap(),
212 _ => FilterNode::And(conditions),
213 })
214}
215
216fn merge_selector_filters(
217 base: FilterNode,
218 args: &ObjectAccessor,
219 selectors: &[SelectorDef],
220) -> Result<FilterNode, async_graphql::Error> {
221 let mut extra = Vec::new();
222
223 for sel in selectors {
224 if let Ok(val) = args.try_get(&sel.graphql_name) {
225 if let Ok(obj) = val.object() {
226 let leaf_filters =
227 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
228 extra.extend(leaf_filters);
229 }
230 }
231 }
232
233 if extra.is_empty() {
234 return Ok(base);
235 }
236 if base.is_empty() {
237 return Ok(if extra.len() == 1 {
238 extra.remove(0)
239 } else {
240 FilterNode::And(extra)
241 });
242 }
243 extra.push(base);
244 Ok(FilterNode::And(extra))
245}
246
247fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
248 if defaults.is_empty() {
249 return user_filters;
250 }
251
252 let mut default_nodes: Vec<FilterNode> = defaults
253 .iter()
254 .map(|(col, val)| {
255 let sql_val = if val == "true" || val == "false" {
256 SqlValue::Bool(val == "true")
257 } else if let Ok(n) = val.parse::<i64>() {
258 SqlValue::Int(n)
259 } else {
260 SqlValue::String(val.clone())
261 };
262 FilterNode::Condition {
263 column: col.clone(),
264 op: CompareOp::Eq,
265 value: sql_val,
266 }
267 })
268 .collect();
269
270 if user_filters.is_empty() {
271 if default_nodes.len() == 1 {
272 return default_nodes.remove(0);
273 }
274 return FilterNode::And(default_nodes);
275 }
276
277 default_nodes.push(user_filters);
278 FilterNode::And(default_nodes)
279}
280
281fn parse_limit(
282 args: &ObjectAccessor,
283 default: u32,
284 max: u32,
285) -> Result<(u32, u32), async_graphql::Error> {
286 let mut limit = default;
287 let mut offset = 0u32;
288
289 if let Ok(limit_val) = args.try_get("limit") {
290 if let Ok(limit_obj) = limit_val.object() {
291 if let Ok(count) = limit_obj.try_get("count") {
292 limit = (count.i64()? as u32).min(max);
293 }
294 if let Ok(off) = limit_obj.try_get("offset") {
295 offset = off.i64()? as u32;
296 }
297 }
298 }
299
300 Ok((limit, offset))
301}
302
303fn parse_order_by(
304 args: &ObjectAccessor,
305 cube: &CubeDefinition,
306) -> Result<Vec<OrderExpr>, async_graphql::Error> {
307 let flat = cube.flat_dimensions();
308
309 if let Ok(list_val) = args.try_get("orderByList") {
310 if let Ok(list) = list_val.list() {
311 let mut orders = Vec::new();
312 for item in list.iter() {
313 let obj = item.object()
314 .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
315 let field_accessor = obj.try_get("field")
316 .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
317 let field_str = field_accessor.enum_name()
318 .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
319 let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
320 dir_accessor.enum_name() == Ok("DESC")
321 } else {
322 false
323 };
324 let column = flat.iter()
325 .find(|(p, _)| p == field_str)
326 .map(|(_, dim)| dim.column.clone())
327 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
328 orders.push(OrderExpr { column, descending });
329 }
330 if !orders.is_empty() {
331 return Ok(orders);
332 }
333 }
334 }
335
336 let order_val = match args.try_get("orderBy") {
337 Ok(v) => v,
338 Err(_) => return Ok(Vec::new()),
339 };
340
341 let enum_str = order_val
342 .enum_name()
343 .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
344
345 let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
346 (true, path)
347 } else if let Some(path) = enum_str.strip_suffix("_ASC") {
348 (false, path)
349 } else {
350 return Err(async_graphql::Error::new(format!(
351 "Invalid orderBy value: {enum_str}"
352 )));
353 };
354
355 let column = flat
356 .iter()
357 .find(|(p, _)| p == field_path)
358 .map(|(_, dim)| dim.column.clone())
359 .ok_or_else(|| {
360 async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
361 })?;
362
363 Ok(vec![OrderExpr { column, descending }])
364}
365
366fn compile_filter_inline(node: &FilterNode) -> String {
369 match node {
370 FilterNode::Empty => String::new(),
371 FilterNode::Condition { column, op, value } => {
372 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
373 if op.is_unary() {
374 return format!("{col} {}", op.sql_op());
375 }
376 let val_str = match value {
377 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
378 SqlValue::Int(i) => i.to_string(),
379 SqlValue::Float(f) => f.to_string(),
380 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
381 };
382 match op {
383 CompareOp::In | CompareOp::NotIn => {
384 if let SqlValue::String(csv) = value {
385 let items: Vec<String> = csv.split(',')
386 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
387 .collect();
388 format!("{col} {} ({})", op.sql_op(), items.join(", "))
389 } else {
390 format!("{col} {} ({val_str})", op.sql_op())
391 }
392 }
393 CompareOp::Includes => {
394 let like_val = match value {
395 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
396 _ => val_str,
397 };
398 format!("{col} LIKE {like_val}")
399 }
400 _ => format!("{col} {} {val_str}", op.sql_op()),
401 }
402 }
403 FilterNode::And(children) => {
404 let parts: Vec<String> = children.iter()
405 .map(compile_filter_inline)
406 .filter(|s| !s.is_empty())
407 .collect();
408 match parts.len() {
409 0 => String::new(),
410 1 => parts.into_iter().next().unwrap(),
411 _ => format!("({})", parts.join(" AND ")),
412 }
413 }
414 FilterNode::Or(children) => {
415 let parts: Vec<String> = children.iter()
416 .map(compile_filter_inline)
417 .filter(|s| !s.is_empty())
418 .collect();
419 match parts.len() {
420 0 => String::new(),
421 1 => parts.into_iter().next().unwrap(),
422 _ => format!("({})", parts.join(" OR ")),
423 }
424 }
425 }
426}
427
428fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
431 let cols = collect_having_columns(having);
432 for col in cols {
433 if !col.contains('(') {
434 continue;
435 }
436 let already_present = selects.iter().any(|s| match s {
437 SelectExpr::Column { column, .. } => column == &col,
438 _ => false,
439 });
440 if !already_present {
441 selects.push(SelectExpr::Column {
442 column: col,
443 alias: None,
444 });
445 }
446 }
447}
448
449fn collect_having_columns(node: &FilterNode) -> Vec<String> {
450 match node {
451 FilterNode::Empty => vec![],
452 FilterNode::Condition { column, .. } => vec![column.clone()],
453 FilterNode::And(children) | FilterNode::Or(children) => {
454 children.iter().flat_map(collect_having_columns).collect()
455 }
456 }
457}
458
459fn is_aggregate_column(column: &str) -> bool {
462 column.contains('(')
463}
464
465fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
468 match node {
469 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
470 FilterNode::Condition { ref column, .. } => {
471 if is_aggregate_column(column) {
472 (FilterNode::Empty, node)
473 } else {
474 (node, FilterNode::Empty)
475 }
476 }
477 FilterNode::And(children) => {
478 let mut where_parts = Vec::new();
479 let mut having_parts = Vec::new();
480 for child in children {
481 let (w, h) = split_aggregate_filters(child);
482 if !w.is_empty() { where_parts.push(w); }
483 if !h.is_empty() { having_parts.push(h); }
484 }
485 let where_node = match where_parts.len() {
486 0 => FilterNode::Empty,
487 1 => where_parts.into_iter().next().unwrap(),
488 _ => FilterNode::And(where_parts),
489 };
490 let having_node = match having_parts.len() {
491 0 => FilterNode::Empty,
492 1 => having_parts.into_iter().next().unwrap(),
493 _ => FilterNode::And(having_parts),
494 };
495 (where_node, having_node)
496 }
497 FilterNode::Or(children) => {
498 let any_aggregate = children.iter().any(filter_has_aggregate);
499 if any_aggregate {
500 (FilterNode::Empty, FilterNode::Or(children))
501 } else {
502 (FilterNode::Or(children), FilterNode::Empty)
503 }
504 }
505 }
506}
507
508fn filter_has_aggregate(node: &FilterNode) -> bool {
509 match node {
510 FilterNode::Empty => false,
511 FilterNode::Condition { column, .. } => is_aggregate_column(column),
512 FilterNode::And(children) | FilterNode::Or(children) => {
513 children.iter().any(filter_has_aggregate)
514 }
515 }
516}
517
518fn parse_limit_by(
519 args: &ObjectAccessor,
520 cube: &CubeDefinition,
521) -> Result<Option<LimitByExpr>, async_graphql::Error> {
522 let lb_val = match args.try_get("limitBy") {
523 Ok(v) => v,
524 Err(_) => return Ok(None),
525 };
526 let lb_obj = lb_val.object()?;
527 let count = lb_obj.try_get("count")?.i64()? as u32;
528 let offset = lb_obj
529 .try_get("offset")
530 .ok()
531 .and_then(|v| v.i64().ok())
532 .unwrap_or(0) as u32;
533 let by_str = lb_obj.try_get("by")?.string()?;
534
535 let flat = cube.flat_dimensions();
536 let columns: Vec<String> = by_str
537 .split(',')
538 .map(|s| {
539 let trimmed = s.trim();
540 flat.iter()
541 .find(|(path, _)| path == trimmed)
542 .map(|(_, dim)| dim.column.clone())
543 .unwrap_or_else(|| trimmed.to_string())
544 })
545 .collect();
546
547 if columns.is_empty() {
548 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
549 }
550
551 Ok(Some(LimitByExpr { count, offset, columns }))
552}