1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59}
60
61pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66pub struct SchemaConfig {
68 pub chain_groups: Vec<ChainGroupConfig>,
69 pub root_query_name: String,
70 pub stats_callback: Option<StatsCallback>,
73 pub table_name_transform: Option<TableNameTransform>,
75 pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80 fn default() -> Self {
81 Self {
82 chain_groups: vec![
83 ChainGroupConfig {
84 name: "EVM".to_string(),
85 group: ChainGroup::Evm,
86 networks: vec!["eth".into(), "bsc".into()],
87 has_network_arg: true,
88 extra_args: vec![],
89 },
90 ChainGroupConfig {
91 name: "Solana".to_string(),
92 group: ChainGroup::Solana,
93 networks: vec!["sol".into()],
94 has_network_arg: false,
95 extra_args: vec![],
96 },
97 ChainGroupConfig {
98 name: "Trading".to_string(),
99 group: ChainGroup::Trading,
100 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101 has_network_arg: false,
102 extra_args: vec![],
103 },
104 ],
105 root_query_name: "ChainStream".to_string(),
106 stats_callback: None,
107 table_name_transform: None,
108 extra_types: vec![],
109 }
110 }
111}
112
113pub struct ChainContext {
117 pub network: String,
118 pub extra: HashMap<String, String>,
119}
120
121pub fn build_schema(
133 registry: CubeRegistry,
134 dialect: Arc<dyn SqlDialect>,
135 executor: QueryExecutor,
136 config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138 let mut builder = Schema::build("Query", None, None);
139
140 builder = builder.register(filter_types::build_limit_input());
142 builder = builder.register(
144 Enum::new("OrderDirection")
145 .description("Sort direction")
146 .item(EnumItem::new("ASC").description("Ascending"))
147 .item(EnumItem::new("DESC").description("Descending")),
148 );
149 for input in filter_types::build_filter_primitives() {
150 builder = builder.register(input);
151 }
152 builder = builder.register(
153 Enum::new("TimeUnit")
154 .description("Time unit for interval bucketing")
155 .item(EnumItem::new("seconds"))
156 .item(EnumItem::new("minutes"))
157 .item(EnumItem::new("hours"))
158 .item(EnumItem::new("days"))
159 .item(EnumItem::new("weeks"))
160 .item(EnumItem::new("months")),
161 );
162 builder = builder.register(
163 InputObject::new("TimeIntervalInput")
164 .description("Time bucketing interval for DateTime dimensions")
165 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
166 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
167 );
168 builder = builder.register(
169 InputObject::new("DimSelectWhere")
170 .description("Post-aggregation filter for dimension values (HAVING clause)")
171 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
172 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
173 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
174 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
175 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
176 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
177 );
178
179 for extra_enum in config.extra_types {
180 builder = builder.register(extra_enum);
181 }
182
183 for grp in &config.chain_groups {
185 if grp.has_network_arg {
186 let enum_name = format!("{}Network", grp.name);
187 let mut net_enum = Enum::new(&enum_name)
188 .description(format!("{} network selector", grp.name));
189 for net in &grp.networks {
190 net_enum = net_enum.item(EnumItem::new(net));
191 }
192 builder = builder.register(net_enum);
193 }
194 }
195
196 let mut registered_cubes: HashSet<String> = HashSet::new();
198 for cube in registry.cubes() {
199 if registered_cubes.contains(&cube.name) { continue; }
200 registered_cubes.insert(cube.name.clone());
201
202 let types = build_cube_types(cube);
203 for obj in types.objects { builder = builder.register(obj); }
204 for inp in types.inputs { builder = builder.register(inp); }
205 for en in types.enums { builder = builder.register(en); }
206 for un in types.unions { builder = builder.register(un); }
207 }
208
209 let mut query = Object::new("Query");
211
212 for grp in &config.chain_groups {
213 let wrapper_type_name = grp.name.clone();
214
215 let mut wrapper_obj = Object::new(&wrapper_type_name);
218
219 for cube in registry.cubes() {
220 if !cube.chain_groups.contains(&grp.group) { continue; }
221
222 let cube_field = build_cube_field(
223 cube,
224 dialect.clone(),
225 executor.clone(),
226 config.stats_callback.clone(),
227 );
228 wrapper_obj = wrapper_obj.field(cube_field);
229 }
230 builder = builder.register(wrapper_obj);
231
232 let default_network = grp.networks.first().cloned().unwrap_or_default();
234 let has_net_arg = grp.has_network_arg;
235 let net_enum_name = format!("{}Network", grp.name);
236 let wrapper_type_for_resolver = wrapper_type_name.clone();
237 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
238
239 let mut wrapper_field = Field::new(
240 &wrapper_type_name,
241 TypeRef::named_nn(&wrapper_type_name),
242 move |ctx| {
243 let default = default_network.clone();
244 let has_arg = has_net_arg;
245 let arg_names = extra_arg_names.clone();
246 FieldFuture::new(async move {
247 let network = if has_arg {
248 ctx.args.try_get("network")
249 .ok()
250 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
251 .unwrap_or(default)
252 } else {
253 default
254 };
255 let mut extra = HashMap::new();
256 for arg_name in &arg_names {
257 if let Ok(val) = ctx.args.try_get(arg_name) {
258 let resolved = val.enum_name().ok().map(|s| s.to_string())
259 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
260 .or_else(|| val.string().ok().map(|s| s.to_string()));
261 if let Some(v) = resolved {
262 extra.insert(arg_name.clone(), v);
263 }
264 }
265 }
266 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
267 })
268 },
269 );
270
271 if grp.has_network_arg {
272 wrapper_field = wrapper_field.argument(
273 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
274 .description(format!("{} network to query", wrapper_type_for_resolver)),
275 );
276 }
277 for wa in &grp.extra_args {
278 wrapper_field = wrapper_field.argument(
279 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
280 .description(&wa.description),
281 );
282 }
283
284 query = query.field(wrapper_field);
285 }
286
287 let metadata_registry = Arc::new(registry.clone());
289 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
290 .map(|g| (g.name.clone(), g.group.clone()))
291 .collect();
292 let metadata_field = Field::new(
293 "_cubeMetadata",
294 TypeRef::named_nn(TypeRef::STRING),
295 move |_ctx| {
296 let reg = metadata_registry.clone();
297 let groups = metadata_groups.clone();
298 FieldFuture::new(async move {
299 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
300 for (group_name, group_enum) in &groups {
301 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
302 .filter(|c| c.chain_groups.contains(group_enum))
303 .map(serialize_cube_metadata)
304 .collect();
305 group_metadata.push(serde_json::json!({
306 "group": group_name,
307 "cubes": cubes_in_group,
308 }));
309 }
310 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
311 Ok(Some(FieldValue::value(Value::from(json))))
312 })
313 },
314 )
315 .description("Internal: returns JSON metadata about all cubes grouped by chain");
316 query = query.field(metadata_field);
317
318 builder = builder.register(query);
319 builder = builder.data(registry);
320
321 if let Some(transform) = config.table_name_transform.clone() {
322 builder = builder.data(transform);
323 }
324
325 builder.finish()
326}
327
328fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
329 serde_json::json!({
330 "name": cube.name,
331 "description": cube.description,
332 "schema": cube.schema,
333 "tablePattern": cube.table_pattern,
334 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
335 "metrics": cube.metrics.iter().map(|m| {
336 let mut obj = serde_json::json!({
337 "name": m.name,
338 "returnType": format!("{:?}", m.return_type),
339 });
340 if let Some(ref tmpl) = m.expression_template {
341 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
342 }
343 if let Some(ref desc) = m.description {
344 obj["description"] = serde_json::Value::String(desc.clone());
345 }
346 obj
347 }).collect::<Vec<_>>(),
348 "selectors": cube.selectors.iter().map(|s| {
349 serde_json::json!({
350 "name": s.graphql_name,
351 "column": s.column,
352 "type": format!("{:?}", s.dim_type),
353 })
354 }).collect::<Vec<_>>(),
355 "dimensions": serialize_dims(&cube.dimensions),
356 "joins": cube.joins.iter().map(|j| {
357 serde_json::json!({
358 "field": j.field_name,
359 "target": j.target_cube,
360 "joinType": format!("{:?}", j.join_type),
361 })
362 }).collect::<Vec<_>>(),
363 "tableRoutes": cube.table_routes.iter().map(|r| {
364 serde_json::json!({
365 "schema": r.schema,
366 "tablePattern": r.table_pattern,
367 "availableColumns": r.available_columns,
368 "priority": r.priority,
369 })
370 }).collect::<Vec<_>>(),
371 "defaultLimit": cube.default_limit,
372 "maxLimit": cube.max_limit,
373 })
374}
375
376fn build_cube_field(
378 cube: &CubeDefinition,
379 dialect: Arc<dyn SqlDialect>,
380 executor: QueryExecutor,
381 stats_cb: Option<StatsCallback>,
382) -> Field {
383 let cube_name = cube.name.clone();
384 let orderby_input_name = format!("{}OrderByInput", cube.name);
385 let cube_description = cube.description.clone();
386
387 let mut field = Field::new(
388 &cube.name,
389 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
390 move |ctx| {
391 let cube_name = cube_name.clone();
392 let dialect = dialect.clone();
393 let executor = executor.clone();
394 let stats_cb = stats_cb.clone();
395 FieldFuture::new(async move {
396 let registry = ctx.ctx.data::<CubeRegistry>()?;
397
398 let chain_ctx = ctx.parent_value
399 .try_downcast_ref::<ChainContext>().ok();
400 let network = chain_ctx
401 .map(|c| c.network.as_str())
402 .unwrap_or("sol");
403
404 let cube_def = registry.get(&cube_name).ok_or_else(|| {
405 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
406 })?;
407
408 let metric_requests = extract_metric_requests(&ctx, cube_def);
409 let quantile_requests = extract_quantile_requests(&ctx);
410 let calculate_requests = extract_calculate_requests(&ctx);
411 let field_aliases = extract_field_aliases(&ctx, cube_def);
412 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
413 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
414 let requested = extract_requested_fields(&ctx, cube_def);
415 let mut ir = compiler::parser::parse_cube_query(
416 cube_def,
417 network,
418 &ctx.args,
419 &metric_requests,
420 &quantile_requests,
421 &calculate_requests,
422 &field_aliases,
423 &dim_agg_requests,
424 &time_intervals,
425 Some(requested),
426 )?;
427
428 let mut join_idx = 0usize;
429 for sub_field in ctx.ctx.field().selection_set() {
430 let fname = sub_field.name().to_string();
431 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
432 if let Some(jd) = join_def {
433 if let Some(target_cube) = registry.get(&jd.target_cube) {
434 let join_expr = build_join_expr(
435 jd, target_cube, &sub_field, network, join_idx,
436 );
437 ir.joins.push(join_expr);
438 join_idx += 1;
439 }
440 }
441 }
442
443 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
444 if let Some(cctx) = chain_ctx {
445 ir.table = transform(&ir.table, cctx);
446 }
447 }
448
449 let validated = compiler::validator::validate(ir)?;
450 let result = dialect.compile(&validated);
451 let sql = result.sql;
452 let bindings = result.bindings;
453
454 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
455 async_graphql::Error::new(format!("Query execution failed: {e}"))
456 })?;
457
458 let rows = if result.alias_remap.is_empty() {
459 rows
460 } else {
461 rows.into_iter().map(|mut row| {
462 for (alias, original) in &result.alias_remap {
463 if let Some(val) = row.shift_remove(alias) {
464 row.entry(original.clone()).or_insert(val);
465 }
466 }
467 row
468 }).collect()
469 };
470
471 let rows: Vec<RowMap> = if validated.joins.is_empty() {
472 rows
473 } else {
474 rows.into_iter().map(|mut row| {
475 for join in &validated.joins {
476 let prefix = format!("{}.", join.alias);
477 let mut sub_row = RowMap::new();
478 let keys: Vec<String> = row.keys()
479 .filter(|k| k.starts_with(&prefix))
480 .cloned()
481 .collect();
482 for key in keys {
483 if let Some(val) = row.shift_remove(&key) {
484 sub_row.insert(key[prefix.len()..].to_string(), val);
485 }
486 }
487 let obj: serde_json::Map<String, serde_json::Value> =
488 sub_row.into_iter().collect();
489 row.insert(
490 join.join_field.clone(),
491 serde_json::Value::Object(obj),
492 );
493 }
494 row
495 }).collect()
496 };
497
498 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
499 .or_else(|| stats_cb.clone());
500 if let Some(cb) = effective_cb {
501 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
502 cb(stats);
503 }
504
505 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
506 Ok(Some(FieldValue::list(values)))
507 })
508 },
509 );
510 if !cube_description.is_empty() {
511 field = field.description(&cube_description);
512 }
513 field = field
514 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
515 .description("Filter conditions"))
516 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
517 .description("Pagination control"))
518 .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
519 .description("Per-group row limit"))
520 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
521 .description("Sort order (Bitquery-compatible)"));
522
523 for sel in &cube.selectors {
524 let filter_type = dim_type_to_filter_name(&sel.dim_type);
525 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
526 .description(format!("Shorthand filter for {}", sel.graphql_name)));
527 }
528
529 field
530}
531
532fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
533 serde_json::Value::Array(dims.iter().map(|d| match d {
534 DimensionNode::Leaf(dim) => {
535 let mut obj = serde_json::json!({
536 "name": dim.graphql_name,
537 "column": dim.column,
538 "type": format!("{:?}", dim.dim_type),
539 });
540 if let Some(desc) = &dim.description {
541 obj["description"] = serde_json::Value::String(desc.clone());
542 }
543 obj
544 },
545 DimensionNode::Group { graphql_name, description, children } => {
546 let mut obj = serde_json::json!({
547 "name": graphql_name,
548 "children": serialize_dims(children),
549 });
550 if let Some(desc) = description {
551 obj["description"] = serde_json::Value::String(desc.clone());
552 }
553 obj
554 },
555 DimensionNode::Array { graphql_name, description, children } => {
556 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
557 let type_value = match &f.field_type {
558 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
559 "kind": "scalar",
560 "scalarType": format!("{:?}", dt),
561 }),
562 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
563 "kind": "union",
564 "variants": variants.iter().map(|v| serde_json::json!({
565 "typeName": v.type_name,
566 "fieldName": v.field_name,
567 "sourceType": format!("{:?}", v.source_type),
568 })).collect::<Vec<_>>(),
569 }),
570 };
571 let mut field_obj = serde_json::json!({
572 "name": f.graphql_name,
573 "column": f.column,
574 "type": type_value,
575 });
576 if let Some(desc) = &f.description {
577 field_obj["description"] = serde_json::Value::String(desc.clone());
578 }
579 field_obj
580 }).collect();
581 let mut obj = serde_json::json!({
582 "name": graphql_name,
583 "kind": "array",
584 "fields": fields,
585 });
586 if let Some(desc) = description {
587 obj["description"] = serde_json::Value::String(desc.clone());
588 }
589 obj
590 },
591 }).collect())
592}
593
594fn extract_metric_requests(
598 ctx: &async_graphql::dynamic::ResolverContext,
599 cube: &CubeDefinition,
600) -> Vec<compiler::parser::MetricRequest> {
601 let mut requests = Vec::new();
602
603 for sub_field in ctx.ctx.field().selection_set() {
604 let name = sub_field.name();
605 if !cube.has_metric(name) {
606 continue;
607 }
608
609 let args = match sub_field.arguments() {
610 Ok(args) => args,
611 Err(_) => continue,
612 };
613
614 let of_dimension = args
615 .iter()
616 .find(|(k, _)| k.as_str() == "of")
617 .and_then(|(_, v)| match v {
618 async_graphql::Value::Enum(e) => Some(e.to_string()),
619 async_graphql::Value::String(s) => Some(s.clone()),
620 _ => None,
621 })
622 .unwrap_or_else(|| "*".to_string());
623
624 let select_where_value = args
625 .iter()
626 .find(|(k, _)| k.as_str() == "selectWhere")
627 .map(|(_, v)| v.clone());
628
629 let condition_filter = args
630 .iter()
631 .find(|(k, _)| k.as_str() == "if")
632 .and_then(|(_, v)| {
633 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
634 .and_then(|f| if f.is_empty() { None } else { Some(f) })
635 });
636
637 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
638 requests.push(compiler::parser::MetricRequest {
639 function: name.to_string(),
640 alias: gql_alias,
641 of_dimension,
642 select_where_value,
643 condition_filter,
644 });
645 }
646
647 requests
648}
649
650fn extract_requested_fields(
651 ctx: &async_graphql::dynamic::ResolverContext,
652 cube: &CubeDefinition,
653) -> HashSet<String> {
654 let mut fields = HashSet::new();
655 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
656 fields
657}
658
659fn collect_selection_paths(
660 field: &async_graphql::SelectionField<'_>,
661 prefix: &str,
662 out: &mut HashSet<String>,
663 metrics: &[MetricDef],
664) {
665 for sub in field.selection_set() {
666 let name = sub.name();
667 if metrics.iter().any(|m| m.name == name) {
668 continue;
669 }
670 let path = if prefix.is_empty() {
671 name.to_string()
672 } else {
673 format!("{prefix}_{name}")
674 };
675 let has_children = sub.selection_set().next().is_some();
676 if has_children {
677 collect_selection_paths(&sub, &path, out, metrics);
678 } else {
679 out.insert(path);
680 }
681 }
682}
683
684pub type FieldAliasMap = Vec<(String, String)>;
687
688pub struct QuantileRequest {
690 pub alias: String,
691 pub of_dimension: String,
692 pub level: f64,
693}
694
695pub struct CalculateRequest {
697 pub alias: String,
698 pub expression: String,
699}
700
701pub struct TimeIntervalRequest {
704 pub field_path: String,
705 pub graphql_alias: String,
706 pub column: String,
707 pub unit: String,
708 pub count: i64,
709}
710
711pub struct DimAggRequest {
714 pub field_path: String,
715 pub graphql_alias: String,
716 pub value_column: String,
717 pub agg_type: DimAggType,
718 pub compare_column: String,
719 pub condition_filter: Option<FilterNode>,
720 pub select_where_value: Option<async_graphql::Value>,
721}
722
723fn extract_quantile_requests(
724 ctx: &async_graphql::dynamic::ResolverContext,
725) -> Vec<QuantileRequest> {
726 let mut requests = Vec::new();
727 for sub_field in ctx.ctx.field().selection_set() {
728 if sub_field.name() != "quantile" { continue; }
729 let args = match sub_field.arguments() {
730 Ok(a) => a,
731 Err(_) => continue,
732 };
733 let of_dim = args.iter()
734 .find(|(k, _)| k.as_str() == "of")
735 .and_then(|(_, v)| match v {
736 async_graphql::Value::Enum(e) => Some(e.to_string()),
737 async_graphql::Value::String(s) => Some(s.clone()),
738 _ => None,
739 })
740 .unwrap_or_else(|| "*".to_string());
741 let level = args.iter()
742 .find(|(k, _)| k.as_str() == "level")
743 .and_then(|(_, v)| match v {
744 async_graphql::Value::Number(n) => n.as_f64(),
745 _ => None,
746 })
747 .unwrap_or(0.5);
748 let alias = sub_field.alias().unwrap_or("quantile").to_string();
749 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
750 }
751 requests
752}
753
754fn extract_field_aliases(
755 ctx: &async_graphql::dynamic::ResolverContext,
756 cube: &CubeDefinition,
757) -> FieldAliasMap {
758 let flat = cube.flat_dimensions();
759 let mut aliases = Vec::new();
760 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
761 aliases
762}
763
764fn collect_field_aliases(
765 field: &async_graphql::SelectionField<'_>,
766 prefix: &str,
767 flat: &[(String, crate::cube::definition::Dimension)],
768 metrics: &[MetricDef],
769 out: &mut FieldAliasMap,
770) {
771 for sub in field.selection_set() {
772 let name = sub.name();
773 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
774 continue;
775 }
776 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
777 let has_children = sub.selection_set().next().is_some();
778 if has_children {
779 collect_field_aliases(&sub, &path, flat, metrics, out);
780 } else if let Some(alias) = sub.alias() {
781 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
782 let alias_path = if prefix.is_empty() {
783 alias.to_string()
784 } else {
785 format!("{prefix}_{alias}")
786 };
787 out.push((alias_path, dim.column.clone()));
788 }
789 }
790 }
791}
792
793fn extract_calculate_requests(
794 ctx: &async_graphql::dynamic::ResolverContext,
795) -> Vec<CalculateRequest> {
796 let mut requests = Vec::new();
797 for sub_field in ctx.ctx.field().selection_set() {
798 if sub_field.name() != "calculate" { continue; }
799 let args = match sub_field.arguments() {
800 Ok(a) => a,
801 Err(_) => continue,
802 };
803 let expression = args.iter()
804 .find(|(k, _)| k.as_str() == "expression")
805 .and_then(|(_, v)| match v {
806 async_graphql::Value::String(s) => Some(s.clone()),
807 _ => None,
808 });
809 if let Some(expr) = expression {
810 let alias = sub_field.alias().unwrap_or("calculate").to_string();
811 requests.push(CalculateRequest { alias, expression: expr });
812 }
813 }
814 requests
815}
816
817fn extract_dim_agg_requests(
818 ctx: &async_graphql::dynamic::ResolverContext,
819 cube: &CubeDefinition,
820) -> Vec<DimAggRequest> {
821 let flat = cube.flat_dimensions();
822 let mut requests = Vec::new();
823 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
824 requests
825}
826
827fn collect_dim_agg_paths(
828 field: &async_graphql::SelectionField<'_>,
829 prefix: &str,
830 flat: &[(String, crate::cube::definition::Dimension)],
831 metrics: &[MetricDef],
832 cube: &CubeDefinition,
833 out: &mut Vec<DimAggRequest>,
834) {
835 for sub in field.selection_set() {
836 let name = sub.name();
837 if metrics.iter().any(|m| m.name == name) {
838 continue;
839 }
840 let path = if prefix.is_empty() {
841 name.to_string()
842 } else {
843 format!("{prefix}_{name}")
844 };
845 let has_children = sub.selection_set().next().is_some();
846 if has_children {
847 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
848 } else {
849 let args = match sub.arguments() {
850 Ok(a) => a,
851 Err(_) => continue,
852 };
853 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
854 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
855
856 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
857 let cp = match v {
858 async_graphql::Value::Enum(e) => e.to_string(),
859 async_graphql::Value::String(s) => s.clone(),
860 _ => continue,
861 };
862 (DimAggType::ArgMax, cp)
863 } else if let Some((_, v)) = min_val {
864 let cp = match v {
865 async_graphql::Value::Enum(e) => e.to_string(),
866 async_graphql::Value::String(s) => s.clone(),
867 _ => continue,
868 };
869 (DimAggType::ArgMin, cp)
870 } else {
871 continue;
872 };
873
874 let value_column = flat.iter()
875 .find(|(p, _)| p == &path)
876 .map(|(_, dim)| dim.column.clone());
877 let compare_column = flat.iter()
878 .find(|(p, _)| p == &compare_path)
879 .map(|(_, dim)| dim.column.clone());
880
881 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
882 let condition_filter = args.iter()
883 .find(|(k, _)| k.as_str() == "if")
884 .and_then(|(_, v)| {
885 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
886 .and_then(|f| if f.is_empty() { None } else { Some(f) })
887 });
888 let select_where_value = args.iter()
889 .find(|(k, _)| k.as_str() == "selectWhere")
890 .map(|(_, v)| v.clone());
891
892 let gql_alias = sub.alias()
893 .map(|a| a.to_string())
894 .unwrap_or_else(|| path.clone());
895 out.push(DimAggRequest {
896 field_path: path,
897 graphql_alias: gql_alias,
898 value_column: vc,
899 agg_type,
900 compare_column: cc,
901 condition_filter,
902 select_where_value,
903 });
904 }
905 }
906 }
907}
908
909fn extract_time_interval_requests(
910 ctx: &async_graphql::dynamic::ResolverContext,
911 cube: &CubeDefinition,
912) -> Vec<TimeIntervalRequest> {
913 let flat = cube.flat_dimensions();
914 let mut requests = Vec::new();
915 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
916 requests
917}
918
919fn collect_time_interval_paths(
920 field: &async_graphql::SelectionField<'_>,
921 prefix: &str,
922 flat: &[(String, crate::cube::definition::Dimension)],
923 metrics: &[MetricDef],
924 out: &mut Vec<TimeIntervalRequest>,
925) {
926 for sub in field.selection_set() {
927 let name = sub.name();
928 if metrics.iter().any(|m| m.name == name) { continue; }
929 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
930 let has_children = sub.selection_set().next().is_some();
931 if has_children {
932 collect_time_interval_paths(&sub, &path, flat, metrics, out);
933 } else {
934 let args = match sub.arguments() {
935 Ok(a) => a,
936 Err(_) => continue,
937 };
938 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
939 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
940 let unit = obj.get("in")
941 .and_then(|v| match v {
942 async_graphql::Value::Enum(e) => Some(e.to_string()),
943 async_graphql::Value::String(s) => Some(s.clone()),
944 _ => None,
945 });
946 let count = obj.get("count")
947 .and_then(|v| match v {
948 async_graphql::Value::Number(n) => n.as_i64(),
949 _ => None,
950 });
951 if let (Some(unit), Some(count)) = (unit, count) {
952 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
953 let gql_alias = sub.alias().unwrap_or(name).to_string();
954 out.push(TimeIntervalRequest {
955 field_path: path,
956 graphql_alias: gql_alias,
957 column: dim.column.clone(),
958 unit,
959 count,
960 });
961 }
962 }
963 }
964 }
965 }
966}
967
968struct CubeTypes {
973 objects: Vec<Object>,
974 inputs: Vec<InputObject>,
975 enums: Vec<Enum>,
976 unions: Vec<Union>,
977}
978
979fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
980 let record_name = format!("{}Record", cube.name);
981 let filter_name = format!("{}Filter", cube.name);
982 let compare_enum_name = format!("{}CompareFields", cube.name);
983
984 let flat_dims = cube.flat_dimensions();
985
986 let mut compare_enum = Enum::new(&compare_enum_name)
987 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
988 for (path, _) in &flat_dims {
989 compare_enum = compare_enum.item(EnumItem::new(path));
990 }
991
992 let mut record_fields: Vec<Field> = Vec::new();
993 let mut filter_fields: Vec<InputValue> = Vec::new();
994 let mut extra_objects: Vec<Object> = Vec::new();
995 let mut extra_inputs: Vec<InputObject> = Vec::new();
996 let mut extra_unions: Vec<Union> = Vec::new();
997
998 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
999 .description("OR combinator — matches if any sub-filter matches"));
1000
1001 {
1002 let mut collector = DimCollector {
1003 cube_name: &cube.name,
1004 compare_enum_name: &compare_enum_name,
1005 filter_name: &filter_name,
1006 record_fields: &mut record_fields,
1007 filter_fields: &mut filter_fields,
1008 extra_objects: &mut extra_objects,
1009 extra_inputs: &mut extra_inputs,
1010 extra_unions: &mut extra_unions,
1011 };
1012 for node in &cube.dimensions {
1013 collect_dimension_types(node, "", &mut collector);
1014 }
1015 }
1016
1017 let mut metric_enums: Vec<Enum> = Vec::new();
1018 let builtin_descs: std::collections::HashMap<&str, &str> = [
1019 ("count", "Count of rows or distinct values"),
1020 ("sum", "Sum of values"),
1021 ("avg", "Average of values"),
1022 ("min", "Minimum value"),
1023 ("max", "Maximum value"),
1024 ("uniq", "Count of unique (distinct) values"),
1025 ].into_iter().collect();
1026
1027 for metric in &cube.metrics {
1028 let metric_name = &metric.name;
1029 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1030
1031 if metric.supports_where {
1032 extra_inputs.push(
1033 InputObject::new(&select_where_name)
1034 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1035 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1036 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1037 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1038 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1039 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1040 );
1041 }
1042
1043 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1044 let mut of_enum = Enum::new(&of_enum_name)
1045 .description(format!("Dimension to apply {} aggregation on", metric_name));
1046 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1047 metric_enums.push(of_enum);
1048
1049 let metric_name_clone = metric_name.clone();
1050 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1051 let metric_desc = metric.description.as_deref()
1052 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1053 .unwrap_or("Aggregate metric");
1054
1055 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1056 let default_name = metric_name_clone.clone();
1057 FieldFuture::new(async move {
1058 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1059 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1060 let key = metric_key(alias);
1061 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1062 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1063 })
1064 })
1065 .description(metric_desc)
1066 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1067 .description("Dimension to aggregate on (default: all rows)"));
1068
1069 if metric.supports_where {
1070 metric_field = metric_field
1071 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1072 .description("Post-aggregation filter (HAVING)"))
1073 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1074 .description("Conditional filter for this metric"));
1075 }
1076
1077 record_fields.push(metric_field);
1078 }
1079
1080 {
1082 let of_enum_name = format!("{}_quantile_Of", cube.name);
1083 let mut of_enum = Enum::new(&of_enum_name)
1084 .description(format!("Dimension to apply quantile on for {}", cube.name));
1085 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1086 metric_enums.push(of_enum);
1087
1088 let of_enum_for_closure = of_enum_name.clone();
1089 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1090 FieldFuture::new(async move {
1091 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1092 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1093 let key = metric_key(alias);
1094 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1095 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1096 })
1097 })
1098 .description("Compute a quantile (percentile) of a dimension")
1099 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1100 .description("Dimension to compute quantile on"))
1101 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1102 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1103 record_fields.push(quantile_field);
1104 }
1105
1106 {
1108 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1109 FieldFuture::new(async move {
1110 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1111 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1112 let key = metric_key(alias);
1113 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1114 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1115 })
1116 })
1117 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1118 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1119 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1120 record_fields.push(calculate_field);
1121 }
1122
1123 for jd in &cube.joins {
1125 let target_record_name = format!("{}Record", jd.target_cube);
1126 let field_name_owned = jd.field_name.clone();
1127 let mut join_field = Field::new(
1128 &jd.field_name,
1129 TypeRef::named(&target_record_name),
1130 move |ctx| {
1131 let field_name = field_name_owned.clone();
1132 FieldFuture::new(async move {
1133 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1134 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1135 let sub_row: RowMap = obj.iter()
1136 .map(|(k, v)| (k.clone(), v.clone()))
1137 .collect();
1138 Ok(Some(FieldValue::owned_any(sub_row)))
1139 } else {
1140 Ok(Some(FieldValue::value(Value::Null)))
1141 }
1142 })
1143 },
1144 );
1145 if let Some(desc) = &jd.description {
1146 join_field = join_field.description(desc);
1147 }
1148 record_fields.push(join_field);
1149 }
1150
1151 let mut record = Object::new(&record_name);
1152 for f in record_fields { record = record.field(f); }
1153
1154 let mut filter = InputObject::new(&filter_name)
1155 .description(format!("Filter conditions for {} query", cube.name));
1156 for f in filter_fields { filter = filter.field(f); }
1157
1158 let orderby_input_name = format!("{}OrderByInput", cube.name);
1159 let orderby_input = InputObject::new(&orderby_input_name)
1160 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1161 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1162 .description("Sort descending by this field"))
1163 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1164 .description("Sort ascending by this field"))
1165 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1166 .description("Sort descending by computed/aggregated field name"))
1167 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1168 .description("Sort ascending by computed/aggregated field name"));
1169
1170 let limitby_input_name = format!("{}LimitByInput", cube.name);
1171 let limitby_input = InputObject::new(&limitby_input_name)
1172 .description(format!("Per-group row limit for {}", cube.name))
1173 .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1174 .description("Dimension field to group by"))
1175 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1176 .description("Maximum rows per group"))
1177 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1178 .description("Rows to skip per group"));
1179
1180 let mut objects = vec![record]; objects.extend(extra_objects);
1181 let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1182 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1183
1184 CubeTypes { objects, inputs, enums, unions: extra_unions }
1185}
1186
1187struct DimCollector<'a> {
1188 cube_name: &'a str,
1189 compare_enum_name: &'a str,
1190 filter_name: &'a str,
1191 record_fields: &'a mut Vec<Field>,
1192 filter_fields: &'a mut Vec<InputValue>,
1193 extra_objects: &'a mut Vec<Object>,
1194 extra_inputs: &'a mut Vec<InputObject>,
1195 extra_unions: &'a mut Vec<Union>,
1196}
1197
1198fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1199 match node {
1200 DimensionNode::Leaf(dim) => {
1201 let col = dim.column.clone();
1202 let is_datetime = dim.dim_type == DimType::DateTime;
1203 let compare_enum = c.compare_enum_name.to_string();
1204 let cube_filter = c.filter_name.to_string();
1205 let mut leaf_field = Field::new(
1206 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1207 move |ctx| {
1208 let col = col.clone();
1209 FieldFuture::new(async move {
1210 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1211 let has_interval = ctx.args.try_get("interval").is_ok();
1212 let has_max = ctx.args.try_get("maximum").is_ok();
1213 let has_min = ctx.args.try_get("minimum").is_ok();
1214 let key = if has_interval || has_max || has_min {
1215 let name = ctx.ctx.field().alias().unwrap_or(&col);
1216 dim_agg_key(name)
1217 } else {
1218 col.clone()
1219 };
1220 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1221 let gql_val = if is_datetime {
1222 json_to_gql_datetime(val)
1223 } else {
1224 json_to_gql_value(val)
1225 };
1226 Ok(Some(FieldValue::value(gql_val)))
1227 })
1228 },
1229 );
1230 if let Some(desc) = &dim.description {
1231 leaf_field = leaf_field.description(desc);
1232 }
1233 leaf_field = leaf_field
1234 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1235 .description("Return value from row where compare field is maximum (argMax)"))
1236 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1237 .description("Return value from row where compare field is minimum (argMin)"))
1238 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1239 .description("Conditional filter for aggregation"))
1240 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1241 .description("Post-aggregation value filter (HAVING)"));
1242 if is_datetime {
1243 leaf_field = leaf_field
1244 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1245 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1246 }
1247 c.record_fields.push(leaf_field);
1250 if matches!(dim.dim_type, DimType::Bool) {
1251 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(TypeRef::BOOLEAN)));
1252 } else {
1253 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1254 }
1255 }
1256 DimensionNode::Group { graphql_name, description, children } => {
1257 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1258 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1259 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1260
1261 let mut child_record_fields: Vec<Field> = Vec::new();
1262 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1263 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1264
1265 let mut child_collector = DimCollector {
1266 cube_name: c.cube_name,
1267 compare_enum_name: c.compare_enum_name,
1268 filter_name: c.filter_name,
1269 record_fields: &mut child_record_fields,
1270 filter_fields: &mut child_filter_fields,
1271 extra_objects: c.extra_objects,
1272 extra_inputs: c.extra_inputs,
1273 extra_unions: c.extra_unions,
1274 };
1275 for child in children {
1276 collect_dimension_types(child, &new_prefix, &mut child_collector);
1277 }
1278
1279 let mut nested_record = Object::new(&nested_record_name);
1280 for f in child_record_fields { nested_record = nested_record.field(f); }
1281
1282 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1283 let mut nested_filter = InputObject::new(&nested_filter_name)
1284 .description(nested_filter_desc);
1285 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1286
1287 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1288 FieldFuture::new(async move {
1289 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1290 Ok(Some(FieldValue::owned_any(row.clone())))
1291 })
1292 });
1293 if let Some(desc) = description {
1294 group_field = group_field.description(desc);
1295 }
1296 c.record_fields.push(group_field);
1297 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1298 c.extra_objects.push(nested_record);
1299 c.extra_inputs.push(nested_filter);
1300 }
1301 DimensionNode::Array { graphql_name, description, children } => {
1302 let full_path = if prefix.is_empty() {
1303 graphql_name.clone()
1304 } else {
1305 format!("{prefix}_{graphql_name}")
1306 };
1307 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1308 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1309
1310 let mut element_obj = Object::new(&element_type_name);
1311 let mut includes_filter = InputObject::new(&includes_filter_name)
1312 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1313
1314 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1315
1316 for child in children {
1317 match &child.field_type {
1318 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1319 let col_name = child.column.clone();
1320 let is_datetime = *dt == DimType::DateTime;
1321 let mut field = Field::new(
1322 &child.graphql_name,
1323 dim_type_to_typeref(dt),
1324 move |ctx| {
1325 let col = col_name.clone();
1326 FieldFuture::new(async move {
1327 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1328 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1329 let gql_val = if is_datetime {
1330 json_to_gql_datetime(val)
1331 } else {
1332 json_to_gql_value(val)
1333 };
1334 Ok(Some(FieldValue::value(gql_val)))
1335 })
1336 },
1337 );
1338 if let Some(desc) = &child.description {
1339 field = field.description(desc);
1340 }
1341 element_obj = element_obj.field(field);
1342 includes_filter = includes_filter.field(
1343 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1344 );
1345 }
1346 crate::cube::definition::ArrayFieldType::Union(variants) => {
1347 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1348
1349 let mut gql_union = Union::new(&union_name);
1350 let mut variant_objects = Vec::new();
1351
1352 for v in variants {
1353 let variant_obj_name = v.type_name.clone();
1354 let field_name = v.field_name.clone();
1355 let source_type = v.source_type.clone();
1356 let type_ref = dim_type_to_typeref(&source_type);
1357
1358 let val_col = child.column.clone();
1359 let variant_obj = Object::new(&variant_obj_name)
1360 .field(Field::new(
1361 &field_name,
1362 type_ref,
1363 move |ctx| {
1364 let col = val_col.clone();
1365 FieldFuture::new(async move {
1366 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1367 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1368 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1369 })
1370 },
1371 ));
1372 gql_union = gql_union.possible_type(&variant_obj_name);
1373 variant_objects.push(variant_obj);
1374 }
1375
1376 let col_name = child.column.clone();
1377 let type_col = children.iter()
1378 .find(|f| f.graphql_name == "Type")
1379 .map(|f| f.column.clone())
1380 .unwrap_or_default();
1381 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1382
1383 let mut field = Field::new(
1384 &child.graphql_name,
1385 TypeRef::named(&union_name),
1386 move |ctx| {
1387 let col = col_name.clone();
1388 let tcol = type_col.clone();
1389 let vars = variants_clone.clone();
1390 FieldFuture::new(async move {
1391 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1392 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1393 if raw_val.is_null() || raw_val.as_str() == Some("") {
1394 return Ok(None);
1395 }
1396 let type_str = row.get(&tcol)
1397 .and_then(|v| v.as_str())
1398 .unwrap_or("");
1399 let resolved_type = resolve_union_typename(type_str, &vars);
1400 let mut elem = RowMap::new();
1401 elem.insert(col.clone(), raw_val);
1402 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1403 })
1404 },
1405 );
1406 if let Some(desc) = &child.description {
1407 field = field.description(desc);
1408 }
1409 element_obj = element_obj.field(field);
1410
1411 includes_filter = includes_filter.field(
1412 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1413 );
1414
1415 union_registrations.push((union_name, gql_union, variant_objects));
1416 }
1417 }
1418 }
1419
1420 for (_, union_type, variant_objs) in union_registrations {
1422 c.extra_objects.extend(variant_objs);
1423 c.extra_unions.push(union_type);
1431 }
1432
1433 let child_columns: Vec<(String, String)> = children.iter()
1435 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1436 .collect();
1437 let element_type_name_clone = element_type_name.clone();
1438
1439 let mut array_field = Field::new(
1440 graphql_name,
1441 TypeRef::named_nn_list_nn(&element_type_name),
1442 move |ctx| {
1443 let cols = child_columns.clone();
1444 let _etype = element_type_name_clone.clone();
1445 FieldFuture::new(async move {
1446 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1447 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1448 .map(|(gql_name, col)| {
1449 let arr = row.get(col)
1450 .and_then(|v| v.as_array())
1451 .cloned()
1452 .unwrap_or_default();
1453 (gql_name.as_str(), arr)
1454 })
1455 .collect();
1456
1457 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1458 let mut elements = Vec::with_capacity(len);
1459 for i in 0..len {
1460 let mut elem = RowMap::new();
1461 for (gql_name, arr) in &arrays {
1462 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1463 elem.insert(gql_name.to_string(), val);
1464 }
1465 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1467 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1468 elem.insert(col.clone(), val);
1469 }
1470 elements.push(FieldValue::owned_any(elem));
1471 }
1472 Ok(Some(FieldValue::list(elements)))
1473 })
1474 },
1475 );
1476 if let Some(desc) = description {
1477 array_field = array_field.description(desc);
1478 }
1479 c.record_fields.push(array_field);
1480
1481 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1485 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1486 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1487 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1488 .description("Match rows where at least one array element satisfies all conditions"));
1489 c.extra_inputs.push(wrapper_filter);
1490
1491 c.filter_fields.push(InputValue::new(
1492 graphql_name,
1493 TypeRef::named(&wrapper_filter_name),
1494 ));
1495
1496 c.extra_objects.push(element_obj);
1497 c.extra_inputs.push(includes_filter);
1498 }
1499 }
1500}
1501
1502fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1508 for v in variants {
1509 if v.source_type_names.iter().any(|s| s == type_str) {
1510 return v.type_name.clone();
1511 }
1512 }
1513 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1514}
1515
1516fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1517 match dt {
1518 DimType::String | DimType::DateTime | DimType::Decimal => TypeRef::named(TypeRef::STRING),
1519 DimType::Int => TypeRef::named(TypeRef::INT),
1520 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1521 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1522 }
1523}
1524
1525fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1526 match dt {
1527 DimType::String => "StringFilter",
1528 DimType::Int => "IntFilter",
1529 DimType::Float => "FloatFilter",
1530 DimType::Decimal => "DecimalFilter",
1531 DimType::DateTime => "DateTimeFilter",
1532 DimType::Bool => "BoolFilter",
1533 }
1534}
1535
1536pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1537 match v {
1538 serde_json::Value::Null => Value::Null,
1539 serde_json::Value::Bool(b) => Value::from(b),
1540 serde_json::Value::Number(n) => {
1541 if let Some(i) = n.as_i64() { Value::from(i) }
1542 else if let Some(f) = n.as_f64() { Value::from(f) }
1543 else { Value::from(n.to_string()) }
1544 }
1545 serde_json::Value::String(s) => Value::from(s),
1546 _ => Value::from(v.to_string()),
1547 }
1548}
1549
1550fn build_join_expr(
1553 jd: &crate::cube::definition::JoinDef,
1554 target_cube: &CubeDefinition,
1555 sub_field: &async_graphql::SelectionField<'_>,
1556 network: &str,
1557 join_idx: usize,
1558) -> JoinExpr {
1559 let target_flat = target_cube.flat_dimensions();
1560 let target_table = target_cube.table_for_chain(network);
1561
1562 let mut requested_paths = HashSet::new();
1563 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1564
1565 let mut selects: Vec<SelectExpr> = target_flat.iter()
1566 .filter(|(path, _)| requested_paths.contains(path))
1567 .map(|(_, dim)| SelectExpr::Column {
1568 column: dim.column.clone(),
1569 alias: None,
1570 })
1571 .collect();
1572
1573 if selects.is_empty() {
1574 selects = target_flat.iter()
1575 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1576 .collect();
1577 }
1578
1579 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1580
1581 let group_by = if is_aggregate {
1582 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1583 for sel in &selects {
1584 if let SelectExpr::Column { column, .. } = sel {
1585 if !column.contains('(') && !gb.contains(column) {
1586 gb.push(column.clone());
1587 }
1588 }
1589 }
1590 gb
1591 } else {
1592 vec![]
1593 };
1594
1595 JoinExpr {
1596 schema: target_cube.schema.clone(),
1597 table: target_table,
1598 alias: format!("_j{}", join_idx),
1599 conditions: jd.conditions.clone(),
1600 selects,
1601 group_by,
1602 use_final: target_cube.use_final,
1603 is_aggregate,
1604 target_cube: jd.target_cube.clone(),
1605 join_field: sub_field.name().to_string(),
1606 join_type: jd.join_type.clone(),
1607 }
1608}
1609
1610fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1613 match v {
1614 serde_json::Value::String(s) => {
1615 let iso = if s.contains('T') {
1616 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1617 } else {
1618 let replaced = s.replacen(' ', "T", 1);
1619 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1620 };
1621 Value::from(iso)
1622 }
1623 other => json_to_gql_value(other),
1624 }
1625}