1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
31pub struct ChainGroupConfig {
32 pub name: String,
34 pub group: ChainGroup,
36 pub networks: Vec<String>,
38 pub has_network_arg: bool,
41}
42
43pub struct SchemaConfig {
45 pub chain_groups: Vec<ChainGroupConfig>,
46 pub root_query_name: String,
47 pub stats_callback: Option<StatsCallback>,
50}
51
52impl Default for SchemaConfig {
53 fn default() -> Self {
54 Self {
55 chain_groups: vec![
56 ChainGroupConfig {
57 name: "EVM".to_string(),
58 group: ChainGroup::Evm,
59 networks: vec!["eth".into(), "bsc".into()],
60 has_network_arg: true,
61 },
62 ChainGroupConfig {
63 name: "Solana".to_string(),
64 group: ChainGroup::Solana,
65 networks: vec!["sol".into()],
66 has_network_arg: false,
67 },
68 ChainGroupConfig {
69 name: "Trading".to_string(),
70 group: ChainGroup::Trading,
71 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
72 has_network_arg: false,
73 },
74 ],
75 root_query_name: "ChainStream".to_string(),
76 stats_callback: None,
77 }
78 }
79}
80
81struct ChainContext {
84 network: String,
85}
86
87pub fn build_schema(
99 registry: CubeRegistry,
100 dialect: Arc<dyn SqlDialect>,
101 executor: QueryExecutor,
102 config: SchemaConfig,
103) -> Result<Schema, SchemaError> {
104 let mut builder = Schema::build("Query", None, None);
105
106 builder = builder.register(filter_types::build_limit_input());
108 builder = builder.register(
109 InputObject::new("LimitByInput")
110 .description("Limit results per group (similar to ClickHouse LIMIT BY)")
111 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
112 .description("Comma-separated dimension names to group by"))
113 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
114 .description("Maximum rows per group"))
115 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
116 .description("Rows to skip per group")),
117 );
118 builder = builder.register(
119 Enum::new("OrderDirection")
120 .description("Sort direction")
121 .item(EnumItem::new("ASC").description("Ascending"))
122 .item(EnumItem::new("DESC").description("Descending")),
123 );
124 for input in filter_types::build_filter_primitives() {
125 builder = builder.register(input);
126 }
127 builder = builder.register(
128 Enum::new("TimeUnit")
129 .description("Time unit for interval bucketing")
130 .item(EnumItem::new("seconds"))
131 .item(EnumItem::new("minutes"))
132 .item(EnumItem::new("hours"))
133 .item(EnumItem::new("days"))
134 .item(EnumItem::new("weeks"))
135 .item(EnumItem::new("months")),
136 );
137 builder = builder.register(
138 InputObject::new("TimeIntervalInput")
139 .description("Time bucketing interval for DateTime dimensions")
140 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
141 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
142 );
143 builder = builder.register(
144 InputObject::new("DimSelectWhere")
145 .description("Post-aggregation filter for dimension values (HAVING clause)")
146 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
147 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
148 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
149 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
150 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
151 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
152 );
153
154 for grp in &config.chain_groups {
156 if grp.has_network_arg {
157 let enum_name = format!("{}Network", grp.name);
158 let mut net_enum = Enum::new(&enum_name)
159 .description(format!("{} network selector", grp.name));
160 for net in &grp.networks {
161 net_enum = net_enum.item(EnumItem::new(net));
162 }
163 builder = builder.register(net_enum);
164 }
165 }
166
167 let mut registered_cubes: HashSet<String> = HashSet::new();
169 for cube in registry.cubes() {
170 if registered_cubes.contains(&cube.name) { continue; }
171 registered_cubes.insert(cube.name.clone());
172
173 let types = build_cube_types(cube);
174 for obj in types.objects { builder = builder.register(obj); }
175 for inp in types.inputs { builder = builder.register(inp); }
176 for en in types.enums { builder = builder.register(en); }
177 for un in types.unions { builder = builder.register(un); }
178 }
179
180 let mut query = Object::new("Query");
182
183 for grp in &config.chain_groups {
184 let wrapper_type_name = grp.name.clone();
185
186 let mut wrapper_obj = Object::new(&wrapper_type_name);
189
190 for cube in registry.cubes() {
191 if !cube.chain_groups.contains(&grp.group) { continue; }
192
193 let cube_field = build_cube_field(
194 cube,
195 dialect.clone(),
196 executor.clone(),
197 config.stats_callback.clone(),
198 );
199 wrapper_obj = wrapper_obj.field(cube_field);
200 }
201 builder = builder.register(wrapper_obj);
202
203 let default_network = grp.networks.first().cloned().unwrap_or_default();
205 let has_net_arg = grp.has_network_arg;
206 let net_enum_name = format!("{}Network", grp.name);
207 let wrapper_type_for_resolver = wrapper_type_name.clone();
208
209 let mut wrapper_field = Field::new(
210 &wrapper_type_name,
211 TypeRef::named_nn(&wrapper_type_name),
212 move |ctx| {
213 let default = default_network.clone();
214 let has_arg = has_net_arg;
215 FieldFuture::new(async move {
216 let network = if has_arg {
217 ctx.args.try_get("network")
218 .ok()
219 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
220 .unwrap_or(default)
221 } else {
222 default
223 };
224 Ok(Some(FieldValue::owned_any(ChainContext { network })))
225 })
226 },
227 );
228
229 if grp.has_network_arg {
230 wrapper_field = wrapper_field.argument(
231 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
232 .description(format!("{} network to query", wrapper_type_for_resolver)),
233 );
234 }
235
236 query = query.field(wrapper_field);
237 }
238
239 let metadata_registry = Arc::new(registry.clone());
241 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
242 .map(|g| (g.name.clone(), g.group.clone()))
243 .collect();
244 let metadata_field = Field::new(
245 "_cubeMetadata",
246 TypeRef::named_nn(TypeRef::STRING),
247 move |_ctx| {
248 let reg = metadata_registry.clone();
249 let groups = metadata_groups.clone();
250 FieldFuture::new(async move {
251 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
252 for (group_name, group_enum) in &groups {
253 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
254 .filter(|c| c.chain_groups.contains(group_enum))
255 .map(serialize_cube_metadata)
256 .collect();
257 group_metadata.push(serde_json::json!({
258 "group": group_name,
259 "cubes": cubes_in_group,
260 }));
261 }
262 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
263 Ok(Some(FieldValue::value(Value::from(json))))
264 })
265 },
266 )
267 .description("Internal: returns JSON metadata about all cubes grouped by chain");
268 query = query.field(metadata_field);
269
270 builder = builder.register(query);
271 builder = builder.data(registry);
272
273 builder.finish()
274}
275
276fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
277 serde_json::json!({
278 "name": cube.name,
279 "description": cube.description,
280 "schema": cube.schema,
281 "tablePattern": cube.table_pattern,
282 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
283 "metrics": cube.metrics.iter().map(|m| {
284 let mut obj = serde_json::json!({
285 "name": m.name,
286 "returnType": format!("{:?}", m.return_type),
287 });
288 if let Some(ref tmpl) = m.expression_template {
289 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
290 }
291 if let Some(ref desc) = m.description {
292 obj["description"] = serde_json::Value::String(desc.clone());
293 }
294 obj
295 }).collect::<Vec<_>>(),
296 "selectors": cube.selectors.iter().map(|s| {
297 serde_json::json!({
298 "name": s.graphql_name,
299 "column": s.column,
300 "type": format!("{:?}", s.dim_type),
301 })
302 }).collect::<Vec<_>>(),
303 "dimensions": serialize_dims(&cube.dimensions),
304 "joins": cube.joins.iter().map(|j| {
305 serde_json::json!({
306 "field": j.field_name,
307 "target": j.target_cube,
308 "joinType": format!("{:?}", j.join_type),
309 })
310 }).collect::<Vec<_>>(),
311 "tableRoutes": cube.table_routes.iter().map(|r| {
312 serde_json::json!({
313 "schema": r.schema,
314 "tablePattern": r.table_pattern,
315 "availableColumns": r.available_columns,
316 "priority": r.priority,
317 })
318 }).collect::<Vec<_>>(),
319 "defaultLimit": cube.default_limit,
320 "maxLimit": cube.max_limit,
321 })
322}
323
324fn build_cube_field(
326 cube: &CubeDefinition,
327 dialect: Arc<dyn SqlDialect>,
328 executor: QueryExecutor,
329 stats_cb: Option<StatsCallback>,
330) -> Field {
331 let cube_name = cube.name.clone();
332 let orderby_input_name = format!("{}OrderByInput", cube.name);
333 let cube_description = cube.description.clone();
334
335 let mut field = Field::new(
336 &cube.name,
337 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
338 move |ctx| {
339 let cube_name = cube_name.clone();
340 let dialect = dialect.clone();
341 let executor = executor.clone();
342 let stats_cb = stats_cb.clone();
343 FieldFuture::new(async move {
344 let registry = ctx.ctx.data::<CubeRegistry>()?;
345
346 let network = ctx.parent_value
347 .try_downcast_ref::<ChainContext>()
348 .map(|c| c.network.as_str())
349 .unwrap_or("sol");
350
351 let cube_def = registry.get(&cube_name).ok_or_else(|| {
352 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
353 })?;
354
355 let metric_requests = extract_metric_requests(&ctx, cube_def);
356 let quantile_requests = extract_quantile_requests(&ctx);
357 let calculate_requests = extract_calculate_requests(&ctx);
358 let field_aliases = extract_field_aliases(&ctx, cube_def);
359 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
360 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
361 let requested = extract_requested_fields(&ctx, cube_def);
362 let mut ir = compiler::parser::parse_cube_query(
363 cube_def,
364 network,
365 &ctx.args,
366 &metric_requests,
367 &quantile_requests,
368 &calculate_requests,
369 &field_aliases,
370 &dim_agg_requests,
371 &time_intervals,
372 Some(requested),
373 )?;
374
375 let mut join_idx = 0usize;
376 for sub_field in ctx.ctx.field().selection_set() {
377 let fname = sub_field.name().to_string();
378 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
379 if let Some(jd) = join_def {
380 if let Some(target_cube) = registry.get(&jd.target_cube) {
381 let join_expr = build_join_expr(
382 jd, target_cube, &sub_field, network, join_idx,
383 );
384 ir.joins.push(join_expr);
385 join_idx += 1;
386 }
387 }
388 }
389
390 let validated = compiler::validator::validate(ir)?;
391 let result = dialect.compile(&validated);
392 let sql = result.sql;
393 let bindings = result.bindings;
394
395 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
396 async_graphql::Error::new(format!("Query execution failed: {e}"))
397 })?;
398
399 let rows = if result.alias_remap.is_empty() {
400 rows
401 } else {
402 rows.into_iter().map(|mut row| {
403 for (alias, original) in &result.alias_remap {
404 if let Some(val) = row.shift_remove(alias) {
405 row.entry(original.clone()).or_insert(val);
406 }
407 }
408 row
409 }).collect()
410 };
411
412 let rows: Vec<RowMap> = if validated.joins.is_empty() {
413 rows
414 } else {
415 rows.into_iter().map(|mut row| {
416 for join in &validated.joins {
417 let prefix = format!("{}.", join.alias);
418 let mut sub_row = RowMap::new();
419 let keys: Vec<String> = row.keys()
420 .filter(|k| k.starts_with(&prefix))
421 .cloned()
422 .collect();
423 for key in keys {
424 if let Some(val) = row.shift_remove(&key) {
425 sub_row.insert(key[prefix.len()..].to_string(), val);
426 }
427 }
428 let obj: serde_json::Map<String, serde_json::Value> =
429 sub_row.into_iter().collect();
430 row.insert(
431 join.join_field.clone(),
432 serde_json::Value::Object(obj),
433 );
434 }
435 row
436 }).collect()
437 };
438
439 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
440 .or_else(|| stats_cb.clone());
441 if let Some(cb) = effective_cb {
442 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
443 cb(stats);
444 }
445
446 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
447 Ok(Some(FieldValue::list(values)))
448 })
449 },
450 );
451 if !cube_description.is_empty() {
452 field = field.description(&cube_description);
453 }
454 field = field
455 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
456 .description("Filter conditions"))
457 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
458 .description("Pagination control"))
459 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
460 .description("Per-group row limit"))
461 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
462 .description("Sort order (Bitquery-compatible)"));
463
464 for sel in &cube.selectors {
465 let filter_type = dim_type_to_filter_name(&sel.dim_type);
466 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
467 .description(format!("Shorthand filter for {}", sel.graphql_name)));
468 }
469
470 field
471}
472
473fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
474 serde_json::Value::Array(dims.iter().map(|d| match d {
475 DimensionNode::Leaf(dim) => {
476 let mut obj = serde_json::json!({
477 "name": dim.graphql_name,
478 "column": dim.column,
479 "type": format!("{:?}", dim.dim_type),
480 });
481 if let Some(desc) = &dim.description {
482 obj["description"] = serde_json::Value::String(desc.clone());
483 }
484 obj
485 },
486 DimensionNode::Group { graphql_name, description, children } => {
487 let mut obj = serde_json::json!({
488 "name": graphql_name,
489 "children": serialize_dims(children),
490 });
491 if let Some(desc) = description {
492 obj["description"] = serde_json::Value::String(desc.clone());
493 }
494 obj
495 },
496 DimensionNode::Array { graphql_name, description, children } => {
497 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
498 serde_json::json!({
499 "name": f.graphql_name,
500 "column": f.column,
501 "type": format!("{:?}", f.field_type),
502 })
503 }).collect();
504 let mut obj = serde_json::json!({
505 "name": graphql_name,
506 "kind": "array",
507 "fields": fields,
508 });
509 if let Some(desc) = description {
510 obj["description"] = serde_json::Value::String(desc.clone());
511 }
512 obj
513 },
514 }).collect())
515}
516
517fn extract_metric_requests(
521 ctx: &async_graphql::dynamic::ResolverContext,
522 cube: &CubeDefinition,
523) -> Vec<compiler::parser::MetricRequest> {
524 let mut requests = Vec::new();
525
526 for sub_field in ctx.ctx.field().selection_set() {
527 let name = sub_field.name();
528 if !cube.has_metric(name) {
529 continue;
530 }
531
532 let args = match sub_field.arguments() {
533 Ok(args) => args,
534 Err(_) => continue,
535 };
536
537 let of_dimension = args
538 .iter()
539 .find(|(k, _)| k.as_str() == "of")
540 .and_then(|(_, v)| match v {
541 async_graphql::Value::Enum(e) => Some(e.to_string()),
542 async_graphql::Value::String(s) => Some(s.clone()),
543 _ => None,
544 })
545 .unwrap_or_else(|| "*".to_string());
546
547 let select_where_value = args
548 .iter()
549 .find(|(k, _)| k.as_str() == "selectWhere")
550 .map(|(_, v)| v.clone());
551
552 let condition_filter = args
553 .iter()
554 .find(|(k, _)| k.as_str() == "if")
555 .and_then(|(_, v)| {
556 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
557 .and_then(|f| if f.is_empty() { None } else { Some(f) })
558 });
559
560 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
561 requests.push(compiler::parser::MetricRequest {
562 function: name.to_string(),
563 alias: gql_alias,
564 of_dimension,
565 select_where_value,
566 condition_filter,
567 });
568 }
569
570 requests
571}
572
573fn extract_requested_fields(
574 ctx: &async_graphql::dynamic::ResolverContext,
575 cube: &CubeDefinition,
576) -> HashSet<String> {
577 let mut fields = HashSet::new();
578 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
579 fields
580}
581
582fn collect_selection_paths(
583 field: &async_graphql::SelectionField<'_>,
584 prefix: &str,
585 out: &mut HashSet<String>,
586 metrics: &[MetricDef],
587) {
588 for sub in field.selection_set() {
589 let name = sub.name();
590 if metrics.iter().any(|m| m.name == name) {
591 continue;
592 }
593 let path = if prefix.is_empty() {
594 name.to_string()
595 } else {
596 format!("{prefix}_{name}")
597 };
598 let has_children = sub.selection_set().next().is_some();
599 if has_children {
600 collect_selection_paths(&sub, &path, out, metrics);
601 } else {
602 out.insert(path);
603 }
604 }
605}
606
607pub type FieldAliasMap = Vec<(String, String)>;
610
611pub struct QuantileRequest {
613 pub alias: String,
614 pub of_dimension: String,
615 pub level: f64,
616}
617
618pub struct CalculateRequest {
620 pub alias: String,
621 pub expression: String,
622}
623
624pub struct TimeIntervalRequest {
627 pub field_path: String,
628 pub graphql_alias: String,
629 pub column: String,
630 pub unit: String,
631 pub count: i64,
632}
633
634pub struct DimAggRequest {
637 pub field_path: String,
638 pub graphql_alias: String,
639 pub value_column: String,
640 pub agg_type: DimAggType,
641 pub compare_column: String,
642 pub condition_filter: Option<FilterNode>,
643 pub select_where_value: Option<async_graphql::Value>,
644}
645
646fn extract_quantile_requests(
647 ctx: &async_graphql::dynamic::ResolverContext,
648) -> Vec<QuantileRequest> {
649 let mut requests = Vec::new();
650 for sub_field in ctx.ctx.field().selection_set() {
651 if sub_field.name() != "quantile" { continue; }
652 let args = match sub_field.arguments() {
653 Ok(a) => a,
654 Err(_) => continue,
655 };
656 let of_dim = args.iter()
657 .find(|(k, _)| k.as_str() == "of")
658 .and_then(|(_, v)| match v {
659 async_graphql::Value::Enum(e) => Some(e.to_string()),
660 async_graphql::Value::String(s) => Some(s.clone()),
661 _ => None,
662 })
663 .unwrap_or_else(|| "*".to_string());
664 let level = args.iter()
665 .find(|(k, _)| k.as_str() == "level")
666 .and_then(|(_, v)| match v {
667 async_graphql::Value::Number(n) => n.as_f64(),
668 _ => None,
669 })
670 .unwrap_or(0.5);
671 let alias = sub_field.alias().unwrap_or("quantile").to_string();
672 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
673 }
674 requests
675}
676
677fn extract_field_aliases(
678 ctx: &async_graphql::dynamic::ResolverContext,
679 cube: &CubeDefinition,
680) -> FieldAliasMap {
681 let flat = cube.flat_dimensions();
682 let mut aliases = Vec::new();
683 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
684 aliases
685}
686
687fn collect_field_aliases(
688 field: &async_graphql::SelectionField<'_>,
689 prefix: &str,
690 flat: &[(String, crate::cube::definition::Dimension)],
691 metrics: &[MetricDef],
692 out: &mut FieldAliasMap,
693) {
694 for sub in field.selection_set() {
695 let name = sub.name();
696 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
697 continue;
698 }
699 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
700 let has_children = sub.selection_set().next().is_some();
701 if has_children {
702 collect_field_aliases(&sub, &path, flat, metrics, out);
703 } else if let Some(alias) = sub.alias() {
704 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
705 let alias_path = if prefix.is_empty() {
706 alias.to_string()
707 } else {
708 format!("{prefix}_{alias}")
709 };
710 out.push((alias_path, dim.column.clone()));
711 }
712 }
713 }
714}
715
716fn extract_calculate_requests(
717 ctx: &async_graphql::dynamic::ResolverContext,
718) -> Vec<CalculateRequest> {
719 let mut requests = Vec::new();
720 for sub_field in ctx.ctx.field().selection_set() {
721 if sub_field.name() != "calculate" { continue; }
722 let args = match sub_field.arguments() {
723 Ok(a) => a,
724 Err(_) => continue,
725 };
726 let expression = args.iter()
727 .find(|(k, _)| k.as_str() == "expression")
728 .and_then(|(_, v)| match v {
729 async_graphql::Value::String(s) => Some(s.clone()),
730 _ => None,
731 });
732 if let Some(expr) = expression {
733 let alias = sub_field.alias().unwrap_or("calculate").to_string();
734 requests.push(CalculateRequest { alias, expression: expr });
735 }
736 }
737 requests
738}
739
740fn extract_dim_agg_requests(
741 ctx: &async_graphql::dynamic::ResolverContext,
742 cube: &CubeDefinition,
743) -> Vec<DimAggRequest> {
744 let flat = cube.flat_dimensions();
745 let mut requests = Vec::new();
746 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
747 requests
748}
749
750fn collect_dim_agg_paths(
751 field: &async_graphql::SelectionField<'_>,
752 prefix: &str,
753 flat: &[(String, crate::cube::definition::Dimension)],
754 metrics: &[MetricDef],
755 cube: &CubeDefinition,
756 out: &mut Vec<DimAggRequest>,
757) {
758 for sub in field.selection_set() {
759 let name = sub.name();
760 if metrics.iter().any(|m| m.name == name) {
761 continue;
762 }
763 let path = if prefix.is_empty() {
764 name.to_string()
765 } else {
766 format!("{prefix}_{name}")
767 };
768 let has_children = sub.selection_set().next().is_some();
769 if has_children {
770 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
771 } else {
772 let args = match sub.arguments() {
773 Ok(a) => a,
774 Err(_) => continue,
775 };
776 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
777 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
778
779 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
780 let cp = match v {
781 async_graphql::Value::Enum(e) => e.to_string(),
782 async_graphql::Value::String(s) => s.clone(),
783 _ => continue,
784 };
785 (DimAggType::ArgMax, cp)
786 } else if let Some((_, v)) = min_val {
787 let cp = match v {
788 async_graphql::Value::Enum(e) => e.to_string(),
789 async_graphql::Value::String(s) => s.clone(),
790 _ => continue,
791 };
792 (DimAggType::ArgMin, cp)
793 } else {
794 continue;
795 };
796
797 let value_column = flat.iter()
798 .find(|(p, _)| p == &path)
799 .map(|(_, dim)| dim.column.clone());
800 let compare_column = flat.iter()
801 .find(|(p, _)| p == &compare_path)
802 .map(|(_, dim)| dim.column.clone());
803
804 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
805 let condition_filter = args.iter()
806 .find(|(k, _)| k.as_str() == "if")
807 .and_then(|(_, v)| {
808 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
809 .and_then(|f| if f.is_empty() { None } else { Some(f) })
810 });
811 let select_where_value = args.iter()
812 .find(|(k, _)| k.as_str() == "selectWhere")
813 .map(|(_, v)| v.clone());
814
815 let gql_alias = sub.alias()
816 .map(|a| a.to_string())
817 .unwrap_or_else(|| path.clone());
818 out.push(DimAggRequest {
819 field_path: path,
820 graphql_alias: gql_alias,
821 value_column: vc,
822 agg_type,
823 compare_column: cc,
824 condition_filter,
825 select_where_value,
826 });
827 }
828 }
829 }
830}
831
832fn extract_time_interval_requests(
833 ctx: &async_graphql::dynamic::ResolverContext,
834 cube: &CubeDefinition,
835) -> Vec<TimeIntervalRequest> {
836 let flat = cube.flat_dimensions();
837 let mut requests = Vec::new();
838 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
839 requests
840}
841
842fn collect_time_interval_paths(
843 field: &async_graphql::SelectionField<'_>,
844 prefix: &str,
845 flat: &[(String, crate::cube::definition::Dimension)],
846 metrics: &[MetricDef],
847 out: &mut Vec<TimeIntervalRequest>,
848) {
849 for sub in field.selection_set() {
850 let name = sub.name();
851 if metrics.iter().any(|m| m.name == name) { continue; }
852 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
853 let has_children = sub.selection_set().next().is_some();
854 if has_children {
855 collect_time_interval_paths(&sub, &path, flat, metrics, out);
856 } else {
857 let args = match sub.arguments() {
858 Ok(a) => a,
859 Err(_) => continue,
860 };
861 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
862 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
863 let unit = obj.get("in")
864 .and_then(|v| match v {
865 async_graphql::Value::Enum(e) => Some(e.to_string()),
866 async_graphql::Value::String(s) => Some(s.clone()),
867 _ => None,
868 });
869 let count = obj.get("count")
870 .and_then(|v| match v {
871 async_graphql::Value::Number(n) => n.as_i64(),
872 _ => None,
873 });
874 if let (Some(unit), Some(count)) = (unit, count) {
875 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
876 let gql_alias = sub.alias().unwrap_or(name).to_string();
877 out.push(TimeIntervalRequest {
878 field_path: path,
879 graphql_alias: gql_alias,
880 column: dim.column.clone(),
881 unit,
882 count,
883 });
884 }
885 }
886 }
887 }
888 }
889}
890
891struct CubeTypes {
896 objects: Vec<Object>,
897 inputs: Vec<InputObject>,
898 enums: Vec<Enum>,
899 unions: Vec<Union>,
900}
901
902fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
903 let record_name = format!("{}Record", cube.name);
904 let filter_name = format!("{}Filter", cube.name);
905 let compare_enum_name = format!("{}CompareFields", cube.name);
906
907 let flat_dims = cube.flat_dimensions();
908
909 let mut compare_enum = Enum::new(&compare_enum_name)
910 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
911 for (path, _) in &flat_dims {
912 compare_enum = compare_enum.item(EnumItem::new(path));
913 }
914
915 let mut record_fields: Vec<Field> = Vec::new();
916 let mut filter_fields: Vec<InputValue> = Vec::new();
917 let mut extra_objects: Vec<Object> = Vec::new();
918 let mut extra_inputs: Vec<InputObject> = Vec::new();
919 let mut extra_unions: Vec<Union> = Vec::new();
920
921 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
922 .description("OR combinator — matches if any sub-filter matches"));
923
924 {
925 let mut collector = DimCollector {
926 cube_name: &cube.name,
927 compare_enum_name: &compare_enum_name,
928 filter_name: &filter_name,
929 record_fields: &mut record_fields,
930 filter_fields: &mut filter_fields,
931 extra_objects: &mut extra_objects,
932 extra_inputs: &mut extra_inputs,
933 extra_unions: &mut extra_unions,
934 };
935 for node in &cube.dimensions {
936 collect_dimension_types(node, "", &mut collector);
937 }
938 }
939
940 let mut metric_enums: Vec<Enum> = Vec::new();
941 let builtin_descs: std::collections::HashMap<&str, &str> = [
942 ("count", "Count of rows or distinct values"),
943 ("sum", "Sum of values"),
944 ("avg", "Average of values"),
945 ("min", "Minimum value"),
946 ("max", "Maximum value"),
947 ("uniq", "Count of unique (distinct) values"),
948 ].into_iter().collect();
949
950 for metric in &cube.metrics {
951 let metric_name = &metric.name;
952 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
953
954 if metric.supports_where {
955 extra_inputs.push(
956 InputObject::new(&select_where_name)
957 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
958 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
959 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
960 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
961 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
962 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
963 );
964 }
965
966 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
967 let mut of_enum = Enum::new(&of_enum_name)
968 .description(format!("Dimension to apply {} aggregation on", metric_name));
969 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
970 metric_enums.push(of_enum);
971
972 let metric_name_clone = metric_name.clone();
973 let return_type_ref = dim_type_to_typeref(&metric.return_type);
974 let metric_desc = metric.description.as_deref()
975 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
976 .unwrap_or("Aggregate metric");
977
978 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
979 let default_name = metric_name_clone.clone();
980 FieldFuture::new(async move {
981 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
982 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
983 let key = metric_key(alias);
984 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
985 Ok(Some(FieldValue::value(json_to_gql_value(val))))
986 })
987 })
988 .description(metric_desc)
989 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
990 .description("Dimension to aggregate on (default: all rows)"));
991
992 if metric.supports_where {
993 metric_field = metric_field
994 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
995 .description("Post-aggregation filter (HAVING)"))
996 .argument(InputValue::new("if", TypeRef::named(&filter_name))
997 .description("Conditional filter for this metric"));
998 }
999
1000 record_fields.push(metric_field);
1001 }
1002
1003 {
1005 let of_enum_name = format!("{}_quantile_Of", cube.name);
1006 let mut of_enum = Enum::new(&of_enum_name)
1007 .description(format!("Dimension to apply quantile on for {}", cube.name));
1008 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1009 metric_enums.push(of_enum);
1010
1011 let of_enum_for_closure = of_enum_name.clone();
1012 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1013 FieldFuture::new(async move {
1014 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1015 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1016 let key = metric_key(alias);
1017 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1018 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1019 })
1020 })
1021 .description("Compute a quantile (percentile) of a dimension")
1022 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1023 .description("Dimension to compute quantile on"))
1024 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1025 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1026 record_fields.push(quantile_field);
1027 }
1028
1029 {
1031 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1032 FieldFuture::new(async move {
1033 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1034 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1035 let key = metric_key(alias);
1036 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1037 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1038 })
1039 })
1040 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1041 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1042 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1043 record_fields.push(calculate_field);
1044 }
1045
1046 for jd in &cube.joins {
1048 let target_record_name = format!("{}Record", jd.target_cube);
1049 let field_name_owned = jd.field_name.clone();
1050 let mut join_field = Field::new(
1051 &jd.field_name,
1052 TypeRef::named(&target_record_name),
1053 move |ctx| {
1054 let field_name = field_name_owned.clone();
1055 FieldFuture::new(async move {
1056 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1057 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1058 let sub_row: RowMap = obj.iter()
1059 .map(|(k, v)| (k.clone(), v.clone()))
1060 .collect();
1061 Ok(Some(FieldValue::owned_any(sub_row)))
1062 } else {
1063 Ok(Some(FieldValue::value(Value::Null)))
1064 }
1065 })
1066 },
1067 );
1068 if let Some(desc) = &jd.description {
1069 join_field = join_field.description(desc);
1070 }
1071 record_fields.push(join_field);
1072 }
1073
1074 let mut record = Object::new(&record_name);
1075 for f in record_fields { record = record.field(f); }
1076
1077 let mut filter = InputObject::new(&filter_name)
1078 .description(format!("Filter conditions for {} query", cube.name));
1079 for f in filter_fields { filter = filter.field(f); }
1080
1081 let orderby_input_name = format!("{}OrderByInput", cube.name);
1082 let orderby_input = InputObject::new(&orderby_input_name)
1083 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1084 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1085 .description("Sort descending by this field"))
1086 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1087 .description("Sort ascending by this field"))
1088 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1089 .description("Sort descending by computed/aggregated field name"))
1090 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1091 .description("Sort ascending by computed/aggregated field name"));
1092
1093 let mut objects = vec![record]; objects.extend(extra_objects);
1094 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1095 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1096
1097 CubeTypes { objects, inputs, enums, unions: extra_unions }
1098}
1099
1100struct DimCollector<'a> {
1101 cube_name: &'a str,
1102 compare_enum_name: &'a str,
1103 filter_name: &'a str,
1104 record_fields: &'a mut Vec<Field>,
1105 filter_fields: &'a mut Vec<InputValue>,
1106 extra_objects: &'a mut Vec<Object>,
1107 extra_inputs: &'a mut Vec<InputObject>,
1108 extra_unions: &'a mut Vec<Union>,
1109}
1110
1111fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1112 match node {
1113 DimensionNode::Leaf(dim) => {
1114 let col = dim.column.clone();
1115 let is_datetime = dim.dim_type == DimType::DateTime;
1116 let compare_enum = c.compare_enum_name.to_string();
1117 let cube_filter = c.filter_name.to_string();
1118 let mut leaf_field = Field::new(
1119 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1120 move |ctx| {
1121 let col = col.clone();
1122 FieldFuture::new(async move {
1123 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1124 let has_interval = ctx.args.try_get("interval").is_ok();
1125 let has_max = ctx.args.try_get("maximum").is_ok();
1126 let has_min = ctx.args.try_get("minimum").is_ok();
1127 let key = if has_interval || has_max || has_min {
1128 let name = ctx.ctx.field().alias().unwrap_or(&col);
1129 dim_agg_key(name)
1130 } else {
1131 col.clone()
1132 };
1133 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1134 let gql_val = if is_datetime {
1135 json_to_gql_datetime(val)
1136 } else {
1137 json_to_gql_value(val)
1138 };
1139 Ok(Some(FieldValue::value(gql_val)))
1140 })
1141 },
1142 );
1143 if let Some(desc) = &dim.description {
1144 leaf_field = leaf_field.description(desc);
1145 }
1146 leaf_field = leaf_field
1147 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1148 .description("Return value from row where compare field is maximum (argMax)"))
1149 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1150 .description("Return value from row where compare field is minimum (argMin)"))
1151 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1152 .description("Conditional filter for aggregation"))
1153 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1154 .description("Post-aggregation value filter (HAVING)"));
1155 if is_datetime {
1156 leaf_field = leaf_field
1157 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1158 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1159 }
1160 c.record_fields.push(leaf_field);
1163 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1164 }
1165 DimensionNode::Group { graphql_name, description, children } => {
1166 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1167 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1168 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1169
1170 let mut child_record_fields: Vec<Field> = Vec::new();
1171 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1172 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1173
1174 let mut child_collector = DimCollector {
1175 cube_name: c.cube_name,
1176 compare_enum_name: c.compare_enum_name,
1177 filter_name: c.filter_name,
1178 record_fields: &mut child_record_fields,
1179 filter_fields: &mut child_filter_fields,
1180 extra_objects: c.extra_objects,
1181 extra_inputs: c.extra_inputs,
1182 extra_unions: c.extra_unions,
1183 };
1184 for child in children {
1185 collect_dimension_types(child, &new_prefix, &mut child_collector);
1186 }
1187
1188 let mut nested_record = Object::new(&nested_record_name);
1189 for f in child_record_fields { nested_record = nested_record.field(f); }
1190
1191 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1192 let mut nested_filter = InputObject::new(&nested_filter_name)
1193 .description(nested_filter_desc);
1194 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1195
1196 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1197 FieldFuture::new(async move {
1198 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1199 Ok(Some(FieldValue::owned_any(row.clone())))
1200 })
1201 });
1202 if let Some(desc) = description {
1203 group_field = group_field.description(desc);
1204 }
1205 c.record_fields.push(group_field);
1206 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1207 c.extra_objects.push(nested_record);
1208 c.extra_inputs.push(nested_filter);
1209 }
1210 DimensionNode::Array { graphql_name, description, children } => {
1211 let full_path = if prefix.is_empty() {
1212 graphql_name.clone()
1213 } else {
1214 format!("{prefix}_{graphql_name}")
1215 };
1216 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1217 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1218
1219 let mut element_obj = Object::new(&element_type_name);
1220 let mut includes_filter = InputObject::new(&includes_filter_name)
1221 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1222
1223 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1224
1225 for child in children {
1226 match &child.field_type {
1227 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1228 let col_name = child.column.clone();
1229 let is_datetime = *dt == DimType::DateTime;
1230 let mut field = Field::new(
1231 &child.graphql_name,
1232 dim_type_to_typeref(dt),
1233 move |ctx| {
1234 let col = col_name.clone();
1235 FieldFuture::new(async move {
1236 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1237 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1238 let gql_val = if is_datetime {
1239 json_to_gql_datetime(val)
1240 } else {
1241 json_to_gql_value(val)
1242 };
1243 Ok(Some(FieldValue::value(gql_val)))
1244 })
1245 },
1246 );
1247 if let Some(desc) = &child.description {
1248 field = field.description(desc);
1249 }
1250 element_obj = element_obj.field(field);
1251 includes_filter = includes_filter.field(
1252 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1253 );
1254 }
1255 crate::cube::definition::ArrayFieldType::Union(variants) => {
1256 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1257
1258 let mut gql_union = Union::new(&union_name);
1259 let mut variant_objects = Vec::new();
1260
1261 for v in variants {
1262 let variant_obj_name = v.type_name.clone();
1263 let field_name = v.field_name.clone();
1264 let source_type = v.source_type.clone();
1265 let type_ref = dim_type_to_typeref(&source_type);
1266
1267 let val_col = child.column.clone();
1268 let variant_obj = Object::new(&variant_obj_name)
1269 .field(Field::new(
1270 &field_name,
1271 type_ref,
1272 move |ctx| {
1273 let col = val_col.clone();
1274 FieldFuture::new(async move {
1275 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1276 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1277 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1278 })
1279 },
1280 ));
1281 gql_union = gql_union.possible_type(&variant_obj_name);
1282 variant_objects.push(variant_obj);
1283 }
1284
1285 let col_name = child.column.clone();
1286 let type_col = children.iter()
1287 .find(|f| f.graphql_name == "Type")
1288 .map(|f| f.column.clone())
1289 .unwrap_or_default();
1290 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1291
1292 let mut field = Field::new(
1293 &child.graphql_name,
1294 TypeRef::named(&union_name),
1295 move |ctx| {
1296 let col = col_name.clone();
1297 let tcol = type_col.clone();
1298 let vars = variants_clone.clone();
1299 FieldFuture::new(async move {
1300 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1301 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1302 if raw_val.is_null() || raw_val.as_str() == Some("") {
1303 return Ok(None);
1304 }
1305 let type_str = row.get(&tcol)
1306 .and_then(|v| v.as_str())
1307 .unwrap_or("");
1308 let resolved_type = resolve_union_typename(type_str, &vars);
1309 let mut elem = RowMap::new();
1310 elem.insert(col.clone(), raw_val);
1311 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1312 })
1313 },
1314 );
1315 if let Some(desc) = &child.description {
1316 field = field.description(desc);
1317 }
1318 element_obj = element_obj.field(field);
1319
1320 includes_filter = includes_filter.field(
1321 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1322 );
1323
1324 union_registrations.push((union_name, gql_union, variant_objects));
1325 }
1326 }
1327 }
1328
1329 for (_, union_type, variant_objs) in union_registrations {
1331 c.extra_objects.extend(variant_objs);
1332 c.extra_unions.push(union_type);
1340 }
1341
1342 let child_columns: Vec<(String, String)> = children.iter()
1344 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1345 .collect();
1346 let element_type_name_clone = element_type_name.clone();
1347
1348 let mut array_field = Field::new(
1349 graphql_name,
1350 TypeRef::named_nn_list_nn(&element_type_name),
1351 move |ctx| {
1352 let cols = child_columns.clone();
1353 let _etype = element_type_name_clone.clone();
1354 FieldFuture::new(async move {
1355 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1356 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1357 .map(|(gql_name, col)| {
1358 let arr = row.get(col)
1359 .and_then(|v| v.as_array())
1360 .cloned()
1361 .unwrap_or_default();
1362 (gql_name.as_str(), arr)
1363 })
1364 .collect();
1365
1366 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1367 let mut elements = Vec::with_capacity(len);
1368 for i in 0..len {
1369 let mut elem = RowMap::new();
1370 for (gql_name, arr) in &arrays {
1371 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1372 elem.insert(gql_name.to_string(), val);
1373 }
1374 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1376 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1377 elem.insert(col.clone(), val);
1378 }
1379 elements.push(FieldValue::owned_any(elem));
1380 }
1381 Ok(Some(FieldValue::list(elements)))
1382 })
1383 },
1384 );
1385 if let Some(desc) = description {
1386 array_field = array_field.description(desc);
1387 }
1388 c.record_fields.push(array_field);
1389
1390 c.filter_fields.push(InputValue::new(
1392 graphql_name,
1393 TypeRef::named(&includes_filter_name),
1394 ));
1395
1396 c.extra_objects.push(element_obj);
1397 c.extra_inputs.push(includes_filter);
1398 }
1399 }
1400}
1401
1402fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1404 let fallback = variants.last().map(|v| v.type_name.clone()).unwrap_or_default();
1405 match type_str {
1406 "u8" | "u16" | "u32" | "i32" => {
1407 variants.iter().find(|v| v.field_name == "integer")
1408 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1409 }
1410 "u64" | "u128" | "i64" => {
1411 variants.iter().find(|v| v.field_name == "bigInteger")
1412 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1413 }
1414 "string" => {
1415 variants.iter().find(|v| v.field_name == "string")
1416 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1417 }
1418 "publicKey" | "pubkey" => {
1419 variants.iter().find(|v| v.field_name == "address")
1420 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1421 }
1422 "bool" => {
1423 variants.iter().find(|v| v.field_name == "bool")
1424 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1425 }
1426 _ => fallback,
1427 }
1428}
1429
1430fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1431 match dt {
1432 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1433 DimType::Int => TypeRef::named(TypeRef::INT),
1434 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1435 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1436 }
1437}
1438
1439fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1440 match dt {
1441 DimType::String => "StringFilter",
1442 DimType::Int => "IntFilter",
1443 DimType::Float => "FloatFilter",
1444 DimType::DateTime => "DateTimeFilter",
1445 DimType::Bool => "BoolFilter",
1446 }
1447}
1448
1449pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1450 match v {
1451 serde_json::Value::Null => Value::Null,
1452 serde_json::Value::Bool(b) => Value::from(b),
1453 serde_json::Value::Number(n) => {
1454 if let Some(i) = n.as_i64() { Value::from(i) }
1455 else if let Some(f) = n.as_f64() { Value::from(f) }
1456 else { Value::from(n.to_string()) }
1457 }
1458 serde_json::Value::String(s) => Value::from(s),
1459 _ => Value::from(v.to_string()),
1460 }
1461}
1462
1463fn build_join_expr(
1466 jd: &crate::cube::definition::JoinDef,
1467 target_cube: &CubeDefinition,
1468 sub_field: &async_graphql::SelectionField<'_>,
1469 network: &str,
1470 join_idx: usize,
1471) -> JoinExpr {
1472 let target_flat = target_cube.flat_dimensions();
1473 let target_table = target_cube.table_for_chain(network);
1474
1475 let mut requested_paths = HashSet::new();
1476 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1477
1478 let mut selects: Vec<SelectExpr> = target_flat.iter()
1479 .filter(|(path, _)| requested_paths.contains(path))
1480 .map(|(_, dim)| SelectExpr::Column {
1481 column: dim.column.clone(),
1482 alias: None,
1483 })
1484 .collect();
1485
1486 if selects.is_empty() {
1487 selects = target_flat.iter()
1488 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1489 .collect();
1490 }
1491
1492 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1493
1494 let group_by = if is_aggregate {
1495 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1496 for sel in &selects {
1497 if let SelectExpr::Column { column, .. } = sel {
1498 if !column.contains('(') && !gb.contains(column) {
1499 gb.push(column.clone());
1500 }
1501 }
1502 }
1503 gb
1504 } else {
1505 vec![]
1506 };
1507
1508 JoinExpr {
1509 schema: target_cube.schema.clone(),
1510 table: target_table,
1511 alias: format!("_j{}", join_idx),
1512 conditions: jd.conditions.clone(),
1513 selects,
1514 group_by,
1515 use_final: target_cube.use_final,
1516 is_aggregate,
1517 target_cube: jd.target_cube.clone(),
1518 join_field: sub_field.name().to_string(),
1519 join_type: jd.join_type.clone(),
1520 }
1521}
1522
1523fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1526 match v {
1527 serde_json::Value::String(s) => {
1528 let iso = if s.contains('T') {
1529 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1530 } else {
1531 let replaced = s.replacen(' ', "T", 1);
1532 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1533 };
1534 Value::from(iso)
1535 }
1536 other => json_to_gql_value(other),
1537 }
1538}