1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9pub struct MetricRequest {
11 pub function: String,
12 pub of_dimension: String,
13 pub select_where_value: Option<async_graphql::Value>,
15 pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20 cube: &CubeDefinition,
21 network: &str,
22 args: &ObjectAccessor,
23 metrics: &[MetricRequest],
24 requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26 let table = cube.table_for_chain(network);
27
28 let filters = if let Ok(where_val) = args.try_get("where") {
29 if let Ok(where_obj) = where_val.object() {
30 filter::parse_where(&where_obj, &cube.dimensions)?
31 } else {
32 FilterNode::Empty
33 }
34 } else {
35 FilterNode::Empty
36 };
37
38 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
39 let filters = if let Some(ref chain_col) = cube.chain_column {
42 let chain_filter = FilterNode::Condition {
43 column: chain_col.clone(),
44 op: CompareOp::Eq,
45 value: SqlValue::String(network.to_string()),
46 };
47 if filters.is_empty() {
48 chain_filter
49 } else {
50 FilterNode::And(vec![chain_filter, filters])
51 }
52 } else {
53 filters
54 };
55 let filters = apply_default_filters(filters, &cube.default_filters);
56 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
57 let order_by = parse_order_by(args, cube)?;
58
59 let flat = cube.flat_dimensions();
60 let mut selects: Vec<SelectExpr> = flat
61 .iter()
62 .filter(|(path, _)| {
63 requested_fields
64 .as_ref()
65 .is_none_or(|rf| rf.contains(path))
66 })
67 .map(|(_, dim)| SelectExpr::Column {
68 column: dim.column.clone(),
69 alias: None,
70 })
71 .collect();
72
73 if selects.is_empty() && !flat.is_empty() && metrics.is_empty() {
77 selects = flat
78 .iter()
79 .map(|(_, dim)| SelectExpr::Column {
80 column: dim.column.clone(),
81 alias: None,
82 })
83 .collect();
84 }
85
86 let (filters, agg_having) = split_aggregate_filters(filters);
89
90 let mut group_by = Vec::new();
91 let mut having = agg_having;
92
93 if !metrics.is_empty() {
94 group_by = selects
95 .iter()
96 .filter_map(|s| match s {
97 SelectExpr::Column { column, .. } => Some(column.clone()),
98 _ => None,
99 })
100 .collect();
101
102 for m in metrics {
103 let dim_col = flat
104 .iter()
105 .find(|(path, _)| path == &m.of_dimension)
106 .map(|(_, dim)| dim.column.clone())
107 .unwrap_or_else(|| "*".to_string());
108
109 let func = m.function.to_uppercase();
110 let alias = format!("__{}", m.function);
111
112 let condition = m.condition_filter.as_ref().and_then(|f| {
113 let sql = compile_filter_inline(f);
114 if sql.is_empty() { None } else { Some(sql) }
115 });
116
117 selects.push(SelectExpr::Aggregate {
118 function: func.clone(),
119 column: dim_col.clone(),
120 alias,
121 condition,
122 });
123
124 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
125 let agg_expr = if func == "COUNT" && dim_col == "*" {
126 "COUNT(*)".to_string()
127 } else if func == "UNIQ" {
128 format!("COUNT(DISTINCT `{dim_col}`)")
129 } else {
130 format!("{func}(`{dim_col}`)")
131 };
132
133 let h = parse_select_where_from_value(obj, &agg_expr)?;
134 if !h.is_empty() {
135 having = if having.is_empty() {
136 h
137 } else {
138 FilterNode::And(vec![having, h])
139 };
140 }
141 }
142 }
143 }
144
145 ensure_having_columns_in_selects(&having, &mut selects);
148
149 let limit_by = parse_limit_by(args, cube)?;
150
151 Ok(QueryIR {
152 cube: cube.name.clone(),
153 schema: cube.schema.clone(),
154 table,
155 selects,
156 filters,
157 having,
158 group_by,
159 order_by,
160 limit,
161 offset,
162 limit_by,
163 use_final: cube.use_final,
164 })
165}
166
167fn parse_select_where_from_value(
170 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
171 aggregate_expr: &str,
172) -> Result<FilterNode, async_graphql::Error> {
173 let mut conditions = Vec::new();
174
175 for (key, op) in &[
176 ("eq", CompareOp::Eq),
177 ("gt", CompareOp::Gt),
178 ("ge", CompareOp::Ge),
179 ("lt", CompareOp::Lt),
180 ("le", CompareOp::Le),
181 ] {
182 if let Some(val) = obj.get(*key) {
183 let sql_val = match val {
184 async_graphql::Value::String(s) => {
185 if let Ok(f) = s.parse::<f64>() {
186 SqlValue::Float(f)
187 } else {
188 SqlValue::String(s.clone())
189 }
190 }
191 async_graphql::Value::Number(n) => {
192 if let Some(f) = n.as_f64() {
193 SqlValue::Float(f)
194 } else {
195 SqlValue::Int(n.as_i64().unwrap_or(0))
196 }
197 }
198 _ => continue,
199 };
200 conditions.push(FilterNode::Condition {
201 column: aggregate_expr.to_string(),
202 op: op.clone(),
203 value: sql_val,
204 });
205 }
206 }
207
208 Ok(match conditions.len() {
209 0 => FilterNode::Empty,
210 1 => conditions.into_iter().next().unwrap(),
211 _ => FilterNode::And(conditions),
212 })
213}
214
215fn merge_selector_filters(
216 base: FilterNode,
217 args: &ObjectAccessor,
218 selectors: &[SelectorDef],
219) -> Result<FilterNode, async_graphql::Error> {
220 let mut extra = Vec::new();
221
222 for sel in selectors {
223 if let Ok(val) = args.try_get(&sel.graphql_name) {
224 if let Ok(obj) = val.object() {
225 let leaf_filters =
226 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
227 extra.extend(leaf_filters);
228 }
229 }
230 }
231
232 if extra.is_empty() {
233 return Ok(base);
234 }
235 if base.is_empty() {
236 return Ok(if extra.len() == 1 {
237 extra.remove(0)
238 } else {
239 FilterNode::And(extra)
240 });
241 }
242 extra.push(base);
243 Ok(FilterNode::And(extra))
244}
245
246fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
247 if defaults.is_empty() {
248 return user_filters;
249 }
250
251 let mut default_nodes: Vec<FilterNode> = defaults
252 .iter()
253 .map(|(col, val)| {
254 let sql_val = if val == "true" || val == "false" {
255 SqlValue::Bool(val == "true")
256 } else if let Ok(n) = val.parse::<i64>() {
257 SqlValue::Int(n)
258 } else {
259 SqlValue::String(val.clone())
260 };
261 FilterNode::Condition {
262 column: col.clone(),
263 op: CompareOp::Eq,
264 value: sql_val,
265 }
266 })
267 .collect();
268
269 if user_filters.is_empty() {
270 if default_nodes.len() == 1 {
271 return default_nodes.remove(0);
272 }
273 return FilterNode::And(default_nodes);
274 }
275
276 default_nodes.push(user_filters);
277 FilterNode::And(default_nodes)
278}
279
280fn parse_limit(
281 args: &ObjectAccessor,
282 default: u32,
283 max: u32,
284) -> Result<(u32, u32), async_graphql::Error> {
285 let mut limit = default;
286 let mut offset = 0u32;
287
288 if let Ok(limit_val) = args.try_get("limit") {
289 if let Ok(limit_obj) = limit_val.object() {
290 if let Ok(count) = limit_obj.try_get("count") {
291 limit = (count.i64()? as u32).min(max);
292 }
293 if let Ok(off) = limit_obj.try_get("offset") {
294 offset = off.i64()? as u32;
295 }
296 }
297 }
298
299 Ok((limit, offset))
300}
301
302fn parse_order_by(
303 args: &ObjectAccessor,
304 cube: &CubeDefinition,
305) -> Result<Vec<OrderExpr>, async_graphql::Error> {
306 let flat = cube.flat_dimensions();
307
308 if let Ok(list_val) = args.try_get("orderByList") {
309 if let Ok(list) = list_val.list() {
310 let mut orders = Vec::new();
311 for item in list.iter() {
312 let obj = item.object()
313 .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
314 let field_accessor = obj.try_get("field")
315 .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
316 let field_str = field_accessor.enum_name()
317 .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
318 let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
319 dir_accessor.enum_name() == Ok("DESC")
320 } else {
321 false
322 };
323 let column = flat.iter()
324 .find(|(p, _)| p == field_str)
325 .map(|(_, dim)| dim.column.clone())
326 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
327 orders.push(OrderExpr { column, descending });
328 }
329 if !orders.is_empty() {
330 return Ok(orders);
331 }
332 }
333 }
334
335 let order_val = match args.try_get("orderBy") {
336 Ok(v) => v,
337 Err(_) => return Ok(Vec::new()),
338 };
339
340 let enum_str = order_val
341 .enum_name()
342 .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
343
344 let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
345 (true, path)
346 } else if let Some(path) = enum_str.strip_suffix("_ASC") {
347 (false, path)
348 } else {
349 return Err(async_graphql::Error::new(format!(
350 "Invalid orderBy value: {enum_str}"
351 )));
352 };
353
354 let column = flat
355 .iter()
356 .find(|(p, _)| p == field_path)
357 .map(|(_, dim)| dim.column.clone())
358 .ok_or_else(|| {
359 async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
360 })?;
361
362 Ok(vec![OrderExpr { column, descending }])
363}
364
365fn compile_filter_inline(node: &FilterNode) -> String {
368 match node {
369 FilterNode::Empty => String::new(),
370 FilterNode::Condition { column, op, value } => {
371 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
372 if op.is_unary() {
373 return format!("{col} {}", op.sql_op());
374 }
375 let val_str = match value {
376 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
377 SqlValue::Int(i) => i.to_string(),
378 SqlValue::Float(f) => f.to_string(),
379 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
380 };
381 match op {
382 CompareOp::In | CompareOp::NotIn => {
383 if let SqlValue::String(csv) = value {
384 let items: Vec<String> = csv.split(',')
385 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
386 .collect();
387 format!("{col} {} ({})", op.sql_op(), items.join(", "))
388 } else {
389 format!("{col} {} ({val_str})", op.sql_op())
390 }
391 }
392 CompareOp::Includes => {
393 let like_val = match value {
394 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
395 _ => val_str,
396 };
397 format!("{col} LIKE {like_val}")
398 }
399 _ => format!("{col} {} {val_str}", op.sql_op()),
400 }
401 }
402 FilterNode::And(children) => {
403 let parts: Vec<String> = children.iter()
404 .map(compile_filter_inline)
405 .filter(|s| !s.is_empty())
406 .collect();
407 match parts.len() {
408 0 => String::new(),
409 1 => parts.into_iter().next().unwrap(),
410 _ => format!("({})", parts.join(" AND ")),
411 }
412 }
413 FilterNode::Or(children) => {
414 let parts: Vec<String> = children.iter()
415 .map(compile_filter_inline)
416 .filter(|s| !s.is_empty())
417 .collect();
418 match parts.len() {
419 0 => String::new(),
420 1 => parts.into_iter().next().unwrap(),
421 _ => format!("({})", parts.join(" OR ")),
422 }
423 }
424 }
425}
426
427fn ensure_having_columns_in_selects(having: &FilterNode, selects: &mut Vec<SelectExpr>) {
430 let cols = collect_having_columns(having);
431 for col in cols {
432 if !col.contains('(') {
433 continue;
434 }
435 let already_present = selects.iter().any(|s| match s {
436 SelectExpr::Column { column, .. } => column == &col,
437 _ => false,
438 });
439 if !already_present {
440 selects.push(SelectExpr::Column {
441 column: col,
442 alias: None,
443 });
444 }
445 }
446}
447
448fn collect_having_columns(node: &FilterNode) -> Vec<String> {
449 match node {
450 FilterNode::Empty => vec![],
451 FilterNode::Condition { column, .. } => vec![column.clone()],
452 FilterNode::And(children) | FilterNode::Or(children) => {
453 children.iter().flat_map(collect_having_columns).collect()
454 }
455 }
456}
457
458fn is_aggregate_column(column: &str) -> bool {
461 column.contains('(')
462}
463
464fn split_aggregate_filters(node: FilterNode) -> (FilterNode, FilterNode) {
467 match node {
468 FilterNode::Empty => (FilterNode::Empty, FilterNode::Empty),
469 FilterNode::Condition { ref column, .. } => {
470 if is_aggregate_column(column) {
471 (FilterNode::Empty, node)
472 } else {
473 (node, FilterNode::Empty)
474 }
475 }
476 FilterNode::And(children) => {
477 let mut where_parts = Vec::new();
478 let mut having_parts = Vec::new();
479 for child in children {
480 let (w, h) = split_aggregate_filters(child);
481 if !w.is_empty() { where_parts.push(w); }
482 if !h.is_empty() { having_parts.push(h); }
483 }
484 let where_node = match where_parts.len() {
485 0 => FilterNode::Empty,
486 1 => where_parts.into_iter().next().unwrap(),
487 _ => FilterNode::And(where_parts),
488 };
489 let having_node = match having_parts.len() {
490 0 => FilterNode::Empty,
491 1 => having_parts.into_iter().next().unwrap(),
492 _ => FilterNode::And(having_parts),
493 };
494 (where_node, having_node)
495 }
496 FilterNode::Or(children) => {
497 let any_aggregate = children.iter().any(filter_has_aggregate);
498 if any_aggregate {
499 (FilterNode::Empty, FilterNode::Or(children))
500 } else {
501 (FilterNode::Or(children), FilterNode::Empty)
502 }
503 }
504 }
505}
506
507fn filter_has_aggregate(node: &FilterNode) -> bool {
508 match node {
509 FilterNode::Empty => false,
510 FilterNode::Condition { column, .. } => is_aggregate_column(column),
511 FilterNode::And(children) | FilterNode::Or(children) => {
512 children.iter().any(filter_has_aggregate)
513 }
514 }
515}
516
517fn parse_limit_by(
518 args: &ObjectAccessor,
519 cube: &CubeDefinition,
520) -> Result<Option<LimitByExpr>, async_graphql::Error> {
521 let lb_val = match args.try_get("limitBy") {
522 Ok(v) => v,
523 Err(_) => return Ok(None),
524 };
525 let lb_obj = lb_val.object()?;
526 let count = lb_obj.try_get("count")?.i64()? as u32;
527 let offset = lb_obj
528 .try_get("offset")
529 .ok()
530 .and_then(|v| v.i64().ok())
531 .unwrap_or(0) as u32;
532 let by_str = lb_obj.try_get("by")?.string()?;
533
534 let flat = cube.flat_dimensions();
535 let columns: Vec<String> = by_str
536 .split(',')
537 .map(|s| {
538 let trimmed = s.trim();
539 flat.iter()
540 .find(|(path, _)| path == trimmed)
541 .map(|(_, dim)| dim.column.clone())
542 .unwrap_or_else(|| trimmed.to_string())
543 })
544 .collect();
545
546 if columns.is_empty() {
547 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
548 }
549
550 Ok(Some(LimitByExpr { count, offset, columns }))
551}