1use std::collections::HashSet;
2
3use async_graphql::dynamic::ObjectAccessor;
4
5use crate::compiler::filter;
6use crate::compiler::ir::*;
7use crate::cube::definition::{CubeDefinition, SelectorDef};
8
9pub struct MetricRequest {
11 pub function: String,
12 pub of_dimension: String,
13 pub select_where_value: Option<async_graphql::Value>,
15 pub condition_filter: Option<FilterNode>,
17}
18
19pub fn parse_cube_query(
20 cube: &CubeDefinition,
21 network: &str,
22 args: &ObjectAccessor,
23 metrics: &[MetricRequest],
24 requested_fields: Option<HashSet<String>>,
25) -> Result<QueryIR, async_graphql::Error> {
26 let table = cube.table_for_chain(network);
27
28 let filters = if let Ok(where_val) = args.try_get("where") {
29 if let Ok(where_obj) = where_val.object() {
30 filter::parse_where(&where_obj, &cube.dimensions)?
31 } else {
32 FilterNode::Empty
33 }
34 } else {
35 FilterNode::Empty
36 };
37
38 let filters = merge_selector_filters(filters, args, &cube.selectors)?;
39 let filters = if let Some(ref chain_col) = cube.chain_column {
42 let chain_filter = FilterNode::Condition {
43 column: chain_col.clone(),
44 op: CompareOp::Eq,
45 value: SqlValue::String(network.to_string()),
46 };
47 if filters.is_empty() {
48 chain_filter
49 } else {
50 FilterNode::And(vec![chain_filter, filters])
51 }
52 } else {
53 filters
54 };
55 let filters = apply_default_filters(filters, &cube.default_filters);
56 let (limit, offset) = parse_limit(args, cube.default_limit, cube.max_limit)?;
57 let order_by = parse_order_by(args, cube)?;
58
59 let flat = cube.flat_dimensions();
60 let mut selects: Vec<SelectExpr> = flat
61 .iter()
62 .filter(|(path, _)| {
63 requested_fields
64 .as_ref()
65 .is_none_or(|rf| rf.contains(path))
66 })
67 .map(|(_, dim)| SelectExpr::Column {
68 column: dim.column.clone(),
69 alias: None,
70 })
71 .collect();
72
73 if selects.is_empty() && !flat.is_empty() {
75 selects = flat
76 .iter()
77 .map(|(_, dim)| SelectExpr::Column {
78 column: dim.column.clone(),
79 alias: None,
80 })
81 .collect();
82 }
83
84 let mut group_by = Vec::new();
85 let mut having = FilterNode::Empty;
86
87 if !metrics.is_empty() {
88 group_by = selects
89 .iter()
90 .filter_map(|s| match s {
91 SelectExpr::Column { column, .. } => Some(column.clone()),
92 _ => None,
93 })
94 .collect();
95
96 for m in metrics {
97 let dim_col = flat
98 .iter()
99 .find(|(path, _)| path == &m.of_dimension)
100 .map(|(_, dim)| dim.column.clone())
101 .unwrap_or_else(|| "*".to_string());
102
103 let func = m.function.to_uppercase();
104 let alias = format!("__{}", m.function);
105
106 let condition = m.condition_filter.as_ref().and_then(|f| {
107 let sql = compile_filter_inline(f);
108 if sql.is_empty() { None } else { Some(sql) }
109 });
110
111 selects.push(SelectExpr::Aggregate {
112 function: func.clone(),
113 column: dim_col.clone(),
114 alias,
115 condition,
116 });
117
118 if let Some(async_graphql::Value::Object(ref obj)) = m.select_where_value {
119 let agg_expr = if func == "COUNT" && dim_col == "*" {
120 "COUNT(*)".to_string()
121 } else if func == "UNIQ" {
122 format!("COUNT(DISTINCT `{dim_col}`)")
123 } else {
124 format!("{func}(`{dim_col}`)")
125 };
126
127 let h = parse_select_where_from_value(obj, &agg_expr)?;
128 if !h.is_empty() {
129 having = if having.is_empty() {
130 h
131 } else {
132 FilterNode::And(vec![having, h])
133 };
134 }
135 }
136 }
137 }
138
139 let limit_by = parse_limit_by(args, cube)?;
140
141 Ok(QueryIR {
142 cube: cube.name.clone(),
143 schema: cube.schema.clone(),
144 table,
145 selects,
146 filters,
147 having,
148 group_by,
149 order_by,
150 limit,
151 offset,
152 limit_by,
153 use_final: cube.use_final,
154 })
155}
156
157fn parse_select_where_from_value(
160 obj: &indexmap::IndexMap<async_graphql::Name, async_graphql::Value>,
161 aggregate_expr: &str,
162) -> Result<FilterNode, async_graphql::Error> {
163 let mut conditions = Vec::new();
164
165 for (key, op) in &[
166 ("eq", CompareOp::Eq),
167 ("gt", CompareOp::Gt),
168 ("ge", CompareOp::Ge),
169 ("lt", CompareOp::Lt),
170 ("le", CompareOp::Le),
171 ] {
172 if let Some(val) = obj.get(*key) {
173 let sql_val = match val {
174 async_graphql::Value::String(s) => {
175 if let Ok(f) = s.parse::<f64>() {
176 SqlValue::Float(f)
177 } else {
178 SqlValue::String(s.clone())
179 }
180 }
181 async_graphql::Value::Number(n) => {
182 if let Some(f) = n.as_f64() {
183 SqlValue::Float(f)
184 } else {
185 SqlValue::Int(n.as_i64().unwrap_or(0))
186 }
187 }
188 _ => continue,
189 };
190 conditions.push(FilterNode::Condition {
191 column: aggregate_expr.to_string(),
192 op: op.clone(),
193 value: sql_val,
194 });
195 }
196 }
197
198 Ok(match conditions.len() {
199 0 => FilterNode::Empty,
200 1 => conditions.into_iter().next().unwrap(),
201 _ => FilterNode::And(conditions),
202 })
203}
204
205fn merge_selector_filters(
206 base: FilterNode,
207 args: &ObjectAccessor,
208 selectors: &[SelectorDef],
209) -> Result<FilterNode, async_graphql::Error> {
210 let mut extra = Vec::new();
211
212 for sel in selectors {
213 if let Ok(val) = args.try_get(&sel.graphql_name) {
214 if let Ok(obj) = val.object() {
215 let leaf_filters =
216 filter::parse_leaf_filter_for_selector(&obj, &sel.column, &sel.dim_type)?;
217 extra.extend(leaf_filters);
218 }
219 }
220 }
221
222 if extra.is_empty() {
223 return Ok(base);
224 }
225 if base.is_empty() {
226 return Ok(if extra.len() == 1 {
227 extra.remove(0)
228 } else {
229 FilterNode::And(extra)
230 });
231 }
232 extra.push(base);
233 Ok(FilterNode::And(extra))
234}
235
236fn apply_default_filters(user_filters: FilterNode, defaults: &[(String, String)]) -> FilterNode {
237 if defaults.is_empty() {
238 return user_filters;
239 }
240
241 let mut default_nodes: Vec<FilterNode> = defaults
242 .iter()
243 .map(|(col, val)| {
244 let sql_val = if val == "true" || val == "false" {
245 SqlValue::Bool(val == "true")
246 } else if let Ok(n) = val.parse::<i64>() {
247 SqlValue::Int(n)
248 } else {
249 SqlValue::String(val.clone())
250 };
251 FilterNode::Condition {
252 column: col.clone(),
253 op: CompareOp::Eq,
254 value: sql_val,
255 }
256 })
257 .collect();
258
259 if user_filters.is_empty() {
260 if default_nodes.len() == 1 {
261 return default_nodes.remove(0);
262 }
263 return FilterNode::And(default_nodes);
264 }
265
266 default_nodes.push(user_filters);
267 FilterNode::And(default_nodes)
268}
269
270fn parse_limit(
271 args: &ObjectAccessor,
272 default: u32,
273 max: u32,
274) -> Result<(u32, u32), async_graphql::Error> {
275 let mut limit = default;
276 let mut offset = 0u32;
277
278 if let Ok(limit_val) = args.try_get("limit") {
279 if let Ok(limit_obj) = limit_val.object() {
280 if let Ok(count) = limit_obj.try_get("count") {
281 limit = (count.i64()? as u32).min(max);
282 }
283 if let Ok(off) = limit_obj.try_get("offset") {
284 offset = off.i64()? as u32;
285 }
286 }
287 }
288
289 Ok((limit, offset))
290}
291
292fn parse_order_by(
293 args: &ObjectAccessor,
294 cube: &CubeDefinition,
295) -> Result<Vec<OrderExpr>, async_graphql::Error> {
296 let flat = cube.flat_dimensions();
297
298 if let Ok(list_val) = args.try_get("orderByList") {
299 if let Ok(list) = list_val.list() {
300 let mut orders = Vec::new();
301 for item in list.iter() {
302 let obj = item.object()
303 .map_err(|_| async_graphql::Error::new("orderByList items must be objects"))?;
304 let field_accessor = obj.try_get("field")
305 .map_err(|_| async_graphql::Error::new("orderByList item requires 'field'"))?;
306 let field_str = field_accessor.enum_name()
307 .map_err(|_| async_graphql::Error::new("orderByList 'field' must be an enum value"))?;
308 let descending = if let Ok(dir_accessor) = obj.try_get("direction") {
309 dir_accessor.enum_name() == Ok("DESC")
310 } else {
311 false
312 };
313 let column = flat.iter()
314 .find(|(p, _)| p == field_str)
315 .map(|(_, dim)| dim.column.clone())
316 .ok_or_else(|| async_graphql::Error::new(format!("Unknown orderBy field: {field_str}")))?;
317 orders.push(OrderExpr { column, descending });
318 }
319 if !orders.is_empty() {
320 return Ok(orders);
321 }
322 }
323 }
324
325 let order_val = match args.try_get("orderBy") {
326 Ok(v) => v,
327 Err(_) => return Ok(Vec::new()),
328 };
329
330 let enum_str = order_val
331 .enum_name()
332 .map_err(|_| async_graphql::Error::new("orderBy must be an enum value"))?;
333
334 let (descending, field_path) = if let Some(path) = enum_str.strip_suffix("_DESC") {
335 (true, path)
336 } else if let Some(path) = enum_str.strip_suffix("_ASC") {
337 (false, path)
338 } else {
339 return Err(async_graphql::Error::new(format!(
340 "Invalid orderBy value: {enum_str}"
341 )));
342 };
343
344 let column = flat
345 .iter()
346 .find(|(p, _)| p == field_path)
347 .map(|(_, dim)| dim.column.clone())
348 .ok_or_else(|| {
349 async_graphql::Error::new(format!("Unknown orderBy field: {field_path}"))
350 })?;
351
352 Ok(vec![OrderExpr { column, descending }])
353}
354
355fn compile_filter_inline(node: &FilterNode) -> String {
358 match node {
359 FilterNode::Empty => String::new(),
360 FilterNode::Condition { column, op, value } => {
361 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
362 if op.is_unary() {
363 return format!("{col} {}", op.sql_op());
364 }
365 let val_str = match value {
366 SqlValue::String(s) => format!("'{}'", s.replace('\'', "\\'")),
367 SqlValue::Int(i) => i.to_string(),
368 SqlValue::Float(f) => f.to_string(),
369 SqlValue::Bool(b) => if *b { "1".to_string() } else { "0".to_string() },
370 };
371 match op {
372 CompareOp::In | CompareOp::NotIn => {
373 if let SqlValue::String(csv) = value {
374 let items: Vec<String> = csv.split(',')
375 .map(|s| format!("'{}'", s.trim().replace('\'', "\\'")))
376 .collect();
377 format!("{col} {} ({})", op.sql_op(), items.join(", "))
378 } else {
379 format!("{col} {} ({val_str})", op.sql_op())
380 }
381 }
382 CompareOp::Includes => {
383 let like_val = match value {
384 SqlValue::String(s) => format!("'%{}%'", s.replace('\'', "\\'")),
385 _ => val_str,
386 };
387 format!("{col} LIKE {like_val}")
388 }
389 _ => format!("{col} {} {val_str}", op.sql_op()),
390 }
391 }
392 FilterNode::And(children) => {
393 let parts: Vec<String> = children.iter()
394 .map(compile_filter_inline)
395 .filter(|s| !s.is_empty())
396 .collect();
397 match parts.len() {
398 0 => String::new(),
399 1 => parts.into_iter().next().unwrap(),
400 _ => format!("({})", parts.join(" AND ")),
401 }
402 }
403 FilterNode::Or(children) => {
404 let parts: Vec<String> = children.iter()
405 .map(compile_filter_inline)
406 .filter(|s| !s.is_empty())
407 .collect();
408 match parts.len() {
409 0 => String::new(),
410 1 => parts.into_iter().next().unwrap(),
411 _ => format!("({})", parts.join(" OR ")),
412 }
413 }
414 }
415}
416
417fn parse_limit_by(
418 args: &ObjectAccessor,
419 cube: &CubeDefinition,
420) -> Result<Option<LimitByExpr>, async_graphql::Error> {
421 let lb_val = match args.try_get("limitBy") {
422 Ok(v) => v,
423 Err(_) => return Ok(None),
424 };
425 let lb_obj = lb_val.object()?;
426 let count = lb_obj.try_get("count")?.i64()? as u32;
427 let offset = lb_obj
428 .try_get("offset")
429 .ok()
430 .and_then(|v| v.i64().ok())
431 .unwrap_or(0) as u32;
432 let by_str = lb_obj.try_get("by")?.string()?;
433
434 let flat = cube.flat_dimensions();
435 let columns: Vec<String> = by_str
436 .split(',')
437 .map(|s| {
438 let trimmed = s.trim();
439 flat.iter()
440 .find(|(path, _)| path == trimmed)
441 .map(|(_, dim)| dim.column.clone())
442 .unwrap_or_else(|| trimmed.to_string())
443 })
444 .collect();
445
446 if columns.is_empty() {
447 return Err(async_graphql::Error::new("limitBy.by must specify at least one field"));
448 }
449
450 Ok(Some(LimitByExpr { count, offset, columns }))
451}