1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr, is_aggregate_expr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59 pub network_enum_name: Option<String>,
63}
64
65pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70pub struct SchemaConfig {
72 pub chain_groups: Vec<ChainGroupConfig>,
73 pub root_query_name: String,
74 pub stats_callback: Option<StatsCallback>,
77 pub table_name_transform: Option<TableNameTransform>,
79 pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84 fn default() -> Self {
85 Self {
86 chain_groups: vec![
87 ChainGroupConfig {
88 name: "EVM".to_string(),
89 group: ChainGroup::Evm,
90 networks: vec!["eth".into(), "bsc".into()],
91 has_network_arg: true,
92 extra_args: vec![],
93 network_enum_name: None,
94 },
95 ChainGroupConfig {
96 name: "Solana".to_string(),
97 group: ChainGroup::Solana,
98 networks: vec!["sol".into()],
99 has_network_arg: false,
100 extra_args: vec![],
101 network_enum_name: None,
102 },
103 ChainGroupConfig {
104 name: "Trading".to_string(),
105 group: ChainGroup::Trading,
106 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107 has_network_arg: false,
108 extra_args: vec![],
109 network_enum_name: None,
110 },
111 ],
112 root_query_name: "ChainStream".to_string(),
113 stats_callback: None,
114 table_name_transform: None,
115 extra_types: vec![],
116 }
117 }
118}
119
120pub struct ChainContext {
124 pub network: String,
125 pub extra: HashMap<String, String>,
126}
127
128pub fn build_schema(
140 registry: CubeRegistry,
141 dialect: Arc<dyn SqlDialect>,
142 executor: QueryExecutor,
143 config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145 let mut builder = Schema::build("Query", None, None);
146
147 builder = builder.register(filter_types::build_limit_input());
149 builder = builder.register(
151 Enum::new("OrderDirection")
152 .description("Sort direction")
153 .item(EnumItem::new("ASC").description("Ascending"))
154 .item(EnumItem::new("DESC").description("Descending")),
155 );
156 for input in filter_types::build_filter_primitives() {
157 builder = builder.register(input);
158 }
159 builder = builder.register(
160 Scalar::new("DateTime")
161 .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162 );
163 builder = builder.register(
164 Scalar::new("ISO8601DateTime")
165 .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166 );
167 builder = builder.register(
168 Enum::new("TimeUnit")
169 .description("Time unit for interval bucketing")
170 .item(EnumItem::new("seconds"))
171 .item(EnumItem::new("minutes"))
172 .item(EnumItem::new("hours"))
173 .item(EnumItem::new("days"))
174 .item(EnumItem::new("weeks"))
175 .item(EnumItem::new("months")),
176 );
177 builder = builder.register(
178 InputObject::new("TimeIntervalInput")
179 .description("Time bucketing interval for DateTime dimensions")
180 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182 );
183 builder = builder.register(
184 InputObject::new("DimSelectWhere")
185 .description("Post-aggregation filter for dimension values (HAVING clause)")
186 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192 );
193
194 for extra_enum in config.extra_types {
195 builder = builder.register(extra_enum);
196 }
197
198 for grp in &config.chain_groups {
200 if grp.has_network_arg {
201 let enum_name = grp.network_enum_name.clone()
202 .unwrap_or_else(|| format!("{}Network", grp.name));
203 let mut net_enum = Enum::new(&enum_name)
204 .description(format!("{} network selector", grp.name));
205 for net in &grp.networks {
206 net_enum = net_enum.item(EnumItem::new(net));
207 }
208 builder = builder.register(net_enum);
209 }
210 }
211
212 let mut registered_cubes: HashSet<String> = HashSet::new();
214 for cube in registry.cubes() {
215 if registered_cubes.contains(&cube.name) { continue; }
216 registered_cubes.insert(cube.name.clone());
217
218 let types = build_cube_types(cube);
219 for obj in types.objects { builder = builder.register(obj); }
220 for inp in types.inputs { builder = builder.register(inp); }
221 for en in types.enums { builder = builder.register(en); }
222 for un in types.unions { builder = builder.register(un); }
223 }
224
225 let mut query = Object::new("Query");
227
228 for grp in &config.chain_groups {
229 let wrapper_type_name = grp.name.clone();
230
231 let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235 for cube in registry.cubes() {
236 if !cube.chain_groups.contains(&grp.group) { continue; }
237
238 let cube_field = build_cube_field(
239 cube,
240 dialect.clone(),
241 executor.clone(),
242 config.stats_callback.clone(),
243 );
244 wrapper_obj = wrapper_obj.field(cube_field);
245 }
246 builder = builder.register(wrapper_obj);
247
248 let default_network = grp.networks.first().cloned().unwrap_or_default();
250 let has_net_arg = grp.has_network_arg;
251 let net_enum_name = grp.network_enum_name.clone()
252 .unwrap_or_else(|| format!("{}Network", grp.name));
253 let wrapper_type_for_resolver = wrapper_type_name.clone();
254 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256 let mut wrapper_field = Field::new(
257 &wrapper_type_name,
258 TypeRef::named_nn(&wrapper_type_name),
259 move |ctx| {
260 let default = default_network.clone();
261 let has_arg = has_net_arg;
262 let arg_names = extra_arg_names.clone();
263 FieldFuture::new(async move {
264 let network = if has_arg {
265 ctx.args.try_get("network")
266 .ok()
267 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268 .unwrap_or(default)
269 } else {
270 default
271 };
272 let mut extra = HashMap::new();
273 for arg_name in &arg_names {
274 if let Ok(val) = ctx.args.try_get(arg_name) {
275 let resolved = val.enum_name().ok().map(|s| s.to_string())
276 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277 .or_else(|| val.string().ok().map(|s| s.to_string()));
278 if let Some(v) = resolved {
279 extra.insert(arg_name.clone(), v);
280 }
281 }
282 }
283 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284 })
285 },
286 );
287
288 if grp.has_network_arg {
289 let net_type_ref = if grp.networks.len() > 1 {
290 TypeRef::named_nn(&net_enum_name) } else {
292 TypeRef::named(&net_enum_name) };
294 wrapper_field = wrapper_field.argument(
295 InputValue::new("network", net_type_ref)
296 .description(format!("{} network to query", wrapper_type_for_resolver)),
297 );
298 }
299 for wa in &grp.extra_args {
300 wrapper_field = wrapper_field.argument(
301 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
302 .description(&wa.description),
303 );
304 }
305
306 query = query.field(wrapper_field);
307 }
308
309 let metadata_registry = Arc::new(registry.clone());
311 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
312 .map(|g| (g.name.clone(), g.group.clone()))
313 .collect();
314 let metadata_field = Field::new(
315 "_cubeMetadata",
316 TypeRef::named_nn(TypeRef::STRING),
317 move |_ctx| {
318 let reg = metadata_registry.clone();
319 let groups = metadata_groups.clone();
320 FieldFuture::new(async move {
321 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
322 for (group_name, group_enum) in &groups {
323 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
324 .filter(|c| c.chain_groups.contains(group_enum))
325 .map(serialize_cube_metadata)
326 .collect();
327 group_metadata.push(serde_json::json!({
328 "group": group_name,
329 "cubes": cubes_in_group,
330 }));
331 }
332 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
333 Ok(Some(FieldValue::value(Value::from(json))))
334 })
335 },
336 )
337 .description("Internal: returns JSON metadata about all cubes grouped by chain");
338 query = query.field(metadata_field);
339
340 builder = builder.register(query);
341 builder = builder.data(registry);
342
343 if let Some(transform) = config.table_name_transform.clone() {
344 builder = builder.data(transform);
345 }
346
347 builder.finish()
348}
349
350fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
351 serde_json::json!({
352 "name": cube.name,
353 "description": cube.description,
354 "schema": cube.schema,
355 "tablePattern": cube.table_pattern,
356 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
357 "metrics": cube.metrics.iter().map(|m| {
358 let mut obj = serde_json::json!({
359 "name": m.name,
360 "returnType": format!("{:?}", m.return_type),
361 });
362 if let Some(ref tmpl) = m.expression_template {
363 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
364 }
365 if let Some(ref desc) = m.description {
366 obj["description"] = serde_json::Value::String(desc.clone());
367 }
368 obj
369 }).collect::<Vec<_>>(),
370 "selectors": cube.selectors.iter().map(|s| {
371 serde_json::json!({
372 "name": s.graphql_name,
373 "column": s.column,
374 "type": format!("{:?}", s.dim_type),
375 })
376 }).collect::<Vec<_>>(),
377 "dimensions": serialize_dims(&cube.dimensions),
378 "joins": cube.joins.iter().map(|j| {
379 serde_json::json!({
380 "field": j.field_name,
381 "target": j.target_cube,
382 "joinType": format!("{:?}", j.join_type),
383 })
384 }).collect::<Vec<_>>(),
385 "tableRoutes": cube.table_routes.iter().map(|r| {
386 serde_json::json!({
387 "schema": r.schema,
388 "tablePattern": r.table_pattern,
389 "availableColumns": r.available_columns,
390 "priority": r.priority,
391 })
392 }).collect::<Vec<_>>(),
393 "defaultLimit": cube.default_limit,
394 "maxLimit": cube.max_limit,
395 })
396}
397
398fn build_cube_field(
400 cube: &CubeDefinition,
401 dialect: Arc<dyn SqlDialect>,
402 executor: QueryExecutor,
403 stats_cb: Option<StatsCallback>,
404) -> Field {
405 let cube_name = cube.name.clone();
406 let orderby_input_name = format!("{}OrderByInput", cube.name);
407 let cube_description = cube.description.clone();
408
409 let mut field = Field::new(
410 &cube.name,
411 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
412 move |ctx| {
413 let cube_name = cube_name.clone();
414 let dialect = dialect.clone();
415 let executor = executor.clone();
416 let stats_cb = stats_cb.clone();
417 FieldFuture::new(async move {
418 let registry = ctx.ctx.data::<CubeRegistry>()?;
419
420 let chain_ctx = ctx.parent_value
421 .try_downcast_ref::<ChainContext>().ok();
422 let network = chain_ctx
423 .map(|c| c.network.as_str())
424 .unwrap_or("sol");
425
426 let cube_def = registry.get(&cube_name).ok_or_else(|| {
427 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
428 })?;
429
430 let metric_requests = extract_metric_requests(&ctx, cube_def);
431 let quantile_requests = extract_quantile_requests(&ctx);
432 let calculate_requests = extract_calculate_requests(&ctx);
433 let field_aliases = extract_field_aliases(&ctx, cube_def);
434 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
435 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
436 let requested = extract_requested_fields(&ctx, cube_def);
437 let mut ir = compiler::parser::parse_cube_query(
438 cube_def,
439 network,
440 &ctx.args,
441 &metric_requests,
442 &quantile_requests,
443 &calculate_requests,
444 &field_aliases,
445 &dim_agg_requests,
446 &time_intervals,
447 Some(requested),
448 )?;
449
450 let mut join_idx = 0usize;
451 for sub_field in ctx.ctx.field().selection_set() {
452 let fname = sub_field.name().to_string();
453 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
454 if let Some(jd) = join_def {
455 if let Some(target_cube) = registry.get(&jd.target_cube) {
456 let join_expr = build_join_expr(
457 jd, target_cube, &sub_field, network, join_idx,
458 );
459 ir.joins.push(join_expr);
460 join_idx += 1;
461 }
462 }
463 }
464
465 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
466 if let Some(cctx) = chain_ctx {
467 ir.table = transform(&ir.table, cctx);
468 }
469 }
470
471 let validated = compiler::validator::validate(ir)?;
472 let result = dialect.compile(&validated);
473 let sql = result.sql;
474 let bindings = result.bindings;
475
476 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
477 async_graphql::Error::new(format!("Query execution failed: {e}"))
478 })?;
479
480 let rows = if result.alias_remap.is_empty() {
481 rows
482 } else {
483 rows.into_iter().map(|mut row| {
484 for (alias, original) in &result.alias_remap {
485 if let Some(val) = row.shift_remove(alias) {
486 row.entry(original.clone()).or_insert(val);
487 }
488 }
489 row
490 }).collect()
491 };
492
493 let rows: Vec<RowMap> = if validated.joins.is_empty() {
494 rows
495 } else {
496 rows.into_iter().map(|mut row| {
497 for join in &validated.joins {
498 let prefix = format!("{}.", join.alias);
499 let mut sub_row = RowMap::new();
500 let keys: Vec<String> = row.keys()
501 .filter(|k| k.starts_with(&prefix))
502 .cloned()
503 .collect();
504 for key in keys {
505 if let Some(val) = row.shift_remove(&key) {
506 sub_row.insert(key[prefix.len()..].to_string(), val);
507 }
508 }
509 let obj: serde_json::Map<String, serde_json::Value> =
510 sub_row.into_iter().collect();
511 row.insert(
512 join.join_field.clone(),
513 serde_json::Value::Object(obj),
514 );
515 }
516 row
517 }).collect()
518 };
519
520 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
521 .or_else(|| stats_cb.clone());
522 if let Some(cb) = effective_cb {
523 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
524 cb(stats);
525 }
526
527 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
528 Ok(Some(FieldValue::list(values)))
529 })
530 },
531 );
532 if !cube_description.is_empty() {
533 field = field.description(&cube_description);
534 }
535 field = field
536 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
537 .description("Filter conditions"))
538 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
539 .description("Pagination control"))
540 .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
541 .description("Per-group row limit"))
542 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
543 .description("Sort order (Bitquery-compatible)"));
544
545 for sel in &cube.selectors {
546 let filter_type = dim_type_to_filter_name(&sel.dim_type);
547 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
548 .description(format!("Shorthand filter for {}", sel.graphql_name)));
549 }
550
551 field
552}
553
554fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
555 serde_json::Value::Array(dims.iter().map(|d| match d {
556 DimensionNode::Leaf(dim) => {
557 let mut obj = serde_json::json!({
558 "name": dim.graphql_name,
559 "column": dim.column,
560 "type": format!("{:?}", dim.dim_type),
561 });
562 if let Some(desc) = &dim.description {
563 obj["description"] = serde_json::Value::String(desc.clone());
564 }
565 obj
566 },
567 DimensionNode::Group { graphql_name, description, children } => {
568 let mut obj = serde_json::json!({
569 "name": graphql_name,
570 "children": serialize_dims(children),
571 });
572 if let Some(desc) = description {
573 obj["description"] = serde_json::Value::String(desc.clone());
574 }
575 obj
576 },
577 DimensionNode::Array { graphql_name, description, children } => {
578 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
579 let type_value = match &f.field_type {
580 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
581 "kind": "scalar",
582 "scalarType": format!("{:?}", dt),
583 }),
584 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
585 "kind": "union",
586 "variants": variants.iter().map(|v| serde_json::json!({
587 "typeName": v.type_name,
588 "fieldName": v.field_name,
589 "sourceType": format!("{:?}", v.source_type),
590 })).collect::<Vec<_>>(),
591 }),
592 };
593 let mut field_obj = serde_json::json!({
594 "name": f.graphql_name,
595 "column": f.column,
596 "type": type_value,
597 });
598 if let Some(desc) = &f.description {
599 field_obj["description"] = serde_json::Value::String(desc.clone());
600 }
601 field_obj
602 }).collect();
603 let mut obj = serde_json::json!({
604 "name": graphql_name,
605 "kind": "array",
606 "fields": fields,
607 });
608 if let Some(desc) = description {
609 obj["description"] = serde_json::Value::String(desc.clone());
610 }
611 obj
612 },
613 }).collect())
614}
615
616fn extract_metric_requests(
620 ctx: &async_graphql::dynamic::ResolverContext,
621 cube: &CubeDefinition,
622) -> Vec<compiler::parser::MetricRequest> {
623 let mut requests = Vec::new();
624
625 for sub_field in ctx.ctx.field().selection_set() {
626 let name = sub_field.name();
627 if !cube.has_metric(name) {
628 continue;
629 }
630
631 let args = match sub_field.arguments() {
632 Ok(args) => args,
633 Err(_) => continue,
634 };
635
636 let of_dimension = args
637 .iter()
638 .find(|(k, _)| k.as_str() == "of")
639 .or_else(|| args.iter().find(|(k, _)| k.as_str() == "distinct"))
640 .and_then(|(_, v)| match v {
641 async_graphql::Value::Enum(e) => Some(e.to_string()),
642 async_graphql::Value::String(s) => Some(s.clone()),
643 _ => None,
644 })
645 .unwrap_or_else(|| "*".to_string());
646
647 let select_where_value = args
648 .iter()
649 .find(|(k, _)| k.as_str() == "selectWhere")
650 .map(|(_, v)| v.clone());
651
652 let condition_filter = args
653 .iter()
654 .find(|(k, _)| k.as_str() == "if")
655 .and_then(|(_, v)| {
656 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
657 .and_then(|f| if f.is_empty() { None } else { Some(f) })
658 });
659
660 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
661 requests.push(compiler::parser::MetricRequest {
662 function: name.to_string(),
663 alias: gql_alias,
664 of_dimension,
665 select_where_value,
666 condition_filter,
667 });
668 }
669
670 requests
671}
672
673fn extract_requested_fields(
674 ctx: &async_graphql::dynamic::ResolverContext,
675 cube: &CubeDefinition,
676) -> HashSet<String> {
677 let mut fields = HashSet::new();
678 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
679 fields
680}
681
682fn collect_selection_paths(
683 field: &async_graphql::SelectionField<'_>,
684 prefix: &str,
685 out: &mut HashSet<String>,
686 metrics: &[MetricDef],
687) {
688 for sub in field.selection_set() {
689 let name = sub.name();
690 if metrics.iter().any(|m| m.name == name) {
691 continue;
692 }
693 let path = if prefix.is_empty() {
694 name.to_string()
695 } else {
696 format!("{prefix}_{name}")
697 };
698 let has_children = sub.selection_set().next().is_some();
699 if has_children {
700 collect_selection_paths(&sub, &path, out, metrics);
701 } else {
702 out.insert(path);
703 }
704 }
705}
706
707pub type FieldAliasMap = Vec<(String, String)>;
710
711pub struct QuantileRequest {
713 pub alias: String,
714 pub of_dimension: String,
715 pub level: f64,
716}
717
718pub struct CalculateRequest {
720 pub alias: String,
721 pub expression: String,
722}
723
724pub struct TimeIntervalRequest {
727 pub field_path: String,
728 pub graphql_alias: String,
729 pub column: String,
730 pub unit: String,
731 pub count: i64,
732}
733
734pub struct DimAggRequest {
737 pub field_path: String,
738 pub graphql_alias: String,
739 pub value_column: String,
740 pub agg_type: DimAggType,
741 pub compare_column: String,
742 pub condition_filter: Option<FilterNode>,
743 pub select_where_value: Option<async_graphql::Value>,
744}
745
746fn extract_quantile_requests(
747 ctx: &async_graphql::dynamic::ResolverContext,
748) -> Vec<QuantileRequest> {
749 let mut requests = Vec::new();
750 for sub_field in ctx.ctx.field().selection_set() {
751 if sub_field.name() != "quantile" { continue; }
752 let args = match sub_field.arguments() {
753 Ok(a) => a,
754 Err(_) => continue,
755 };
756 let of_dim = args.iter()
757 .find(|(k, _)| k.as_str() == "of")
758 .and_then(|(_, v)| match v {
759 async_graphql::Value::Enum(e) => Some(e.to_string()),
760 async_graphql::Value::String(s) => Some(s.clone()),
761 _ => None,
762 })
763 .unwrap_or_else(|| "*".to_string());
764 let level = args.iter()
765 .find(|(k, _)| k.as_str() == "level")
766 .and_then(|(_, v)| match v {
767 async_graphql::Value::Number(n) => n.as_f64(),
768 _ => None,
769 })
770 .unwrap_or(0.5);
771 let alias = sub_field.alias().unwrap_or("quantile").to_string();
772 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
773 }
774 requests
775}
776
777fn extract_field_aliases(
778 ctx: &async_graphql::dynamic::ResolverContext,
779 cube: &CubeDefinition,
780) -> FieldAliasMap {
781 let flat = cube.flat_dimensions();
782 let mut aliases = Vec::new();
783 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
784 aliases
785}
786
787fn collect_field_aliases(
788 field: &async_graphql::SelectionField<'_>,
789 prefix: &str,
790 flat: &[(String, crate::cube::definition::Dimension)],
791 metrics: &[MetricDef],
792 out: &mut FieldAliasMap,
793) {
794 for sub in field.selection_set() {
795 let name = sub.name();
796 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
797 continue;
798 }
799 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
800 let has_children = sub.selection_set().next().is_some();
801 if has_children {
802 collect_field_aliases(&sub, &path, flat, metrics, out);
803 } else if let Some(alias) = sub.alias() {
804 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
805 let alias_path = if prefix.is_empty() {
806 alias.to_string()
807 } else {
808 format!("{prefix}_{alias}")
809 };
810 out.push((alias_path, dim.column.clone()));
811 }
812 }
813 }
814}
815
816fn extract_calculate_requests(
817 ctx: &async_graphql::dynamic::ResolverContext,
818) -> Vec<CalculateRequest> {
819 let mut requests = Vec::new();
820 for sub_field in ctx.ctx.field().selection_set() {
821 if sub_field.name() != "calculate" { continue; }
822 let args = match sub_field.arguments() {
823 Ok(a) => a,
824 Err(_) => continue,
825 };
826 let expression = args.iter()
827 .find(|(k, _)| k.as_str() == "expression")
828 .and_then(|(_, v)| match v {
829 async_graphql::Value::String(s) => Some(s.clone()),
830 _ => None,
831 });
832 if let Some(expr) = expression {
833 let alias = sub_field.alias().unwrap_or("calculate").to_string();
834 requests.push(CalculateRequest { alias, expression: expr });
835 }
836 }
837 requests
838}
839
840fn extract_dim_agg_requests(
841 ctx: &async_graphql::dynamic::ResolverContext,
842 cube: &CubeDefinition,
843) -> Vec<DimAggRequest> {
844 let flat = cube.flat_dimensions();
845 let mut requests = Vec::new();
846 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
847 requests
848}
849
850fn collect_dim_agg_paths(
851 field: &async_graphql::SelectionField<'_>,
852 prefix: &str,
853 flat: &[(String, crate::cube::definition::Dimension)],
854 metrics: &[MetricDef],
855 cube: &CubeDefinition,
856 out: &mut Vec<DimAggRequest>,
857) {
858 for sub in field.selection_set() {
859 let name = sub.name();
860 if metrics.iter().any(|m| m.name == name) {
861 continue;
862 }
863 let path = if prefix.is_empty() {
864 name.to_string()
865 } else {
866 format!("{prefix}_{name}")
867 };
868 let has_children = sub.selection_set().next().is_some();
869 if has_children {
870 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
871 } else {
872 let args = match sub.arguments() {
873 Ok(a) => a,
874 Err(_) => continue,
875 };
876 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
877 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
878
879 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
880 let cp = match v {
881 async_graphql::Value::Enum(e) => e.to_string(),
882 async_graphql::Value::String(s) => s.clone(),
883 _ => continue,
884 };
885 (DimAggType::ArgMax, cp)
886 } else if let Some((_, v)) = min_val {
887 let cp = match v {
888 async_graphql::Value::Enum(e) => e.to_string(),
889 async_graphql::Value::String(s) => s.clone(),
890 _ => continue,
891 };
892 (DimAggType::ArgMin, cp)
893 } else {
894 continue;
895 };
896
897 let value_column = flat.iter()
898 .find(|(p, _)| p == &path)
899 .map(|(_, dim)| dim.column.clone());
900 let compare_column = flat.iter()
901 .find(|(p, _)| p == &compare_path)
902 .map(|(_, dim)| dim.column.clone());
903
904 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
905 let condition_filter = args.iter()
906 .find(|(k, _)| k.as_str() == "if")
907 .and_then(|(_, v)| {
908 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
909 .and_then(|f| if f.is_empty() { None } else { Some(f) })
910 });
911 let select_where_value = args.iter()
912 .find(|(k, _)| k.as_str() == "selectWhere")
913 .map(|(_, v)| v.clone());
914
915 let gql_alias = sub.alias()
916 .map(|a| a.to_string())
917 .unwrap_or_else(|| path.clone());
918 out.push(DimAggRequest {
919 field_path: path,
920 graphql_alias: gql_alias,
921 value_column: vc,
922 agg_type,
923 compare_column: cc,
924 condition_filter,
925 select_where_value,
926 });
927 }
928 }
929 }
930}
931
932fn extract_time_interval_requests(
933 ctx: &async_graphql::dynamic::ResolverContext,
934 cube: &CubeDefinition,
935) -> Vec<TimeIntervalRequest> {
936 let flat = cube.flat_dimensions();
937 let mut requests = Vec::new();
938 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
939 requests
940}
941
942fn collect_time_interval_paths(
943 field: &async_graphql::SelectionField<'_>,
944 prefix: &str,
945 flat: &[(String, crate::cube::definition::Dimension)],
946 metrics: &[MetricDef],
947 out: &mut Vec<TimeIntervalRequest>,
948) {
949 for sub in field.selection_set() {
950 let name = sub.name();
951 if metrics.iter().any(|m| m.name == name) { continue; }
952 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
953 let has_children = sub.selection_set().next().is_some();
954 if has_children {
955 collect_time_interval_paths(&sub, &path, flat, metrics, out);
956 } else {
957 let args = match sub.arguments() {
958 Ok(a) => a,
959 Err(_) => continue,
960 };
961 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
962 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
963 let unit = obj.get("in")
964 .and_then(|v| match v {
965 async_graphql::Value::Enum(e) => Some(e.to_string()),
966 async_graphql::Value::String(s) => Some(s.clone()),
967 _ => None,
968 });
969 let count = obj.get("count")
970 .and_then(|v| match v {
971 async_graphql::Value::Number(n) => n.as_i64(),
972 _ => None,
973 });
974 if let (Some(unit), Some(count)) = (unit, count) {
975 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
976 let gql_alias = sub.alias().unwrap_or(name).to_string();
977 out.push(TimeIntervalRequest {
978 field_path: path,
979 graphql_alias: gql_alias,
980 column: dim.column.clone(),
981 unit,
982 count,
983 });
984 }
985 }
986 }
987 }
988 }
989}
990
991struct CubeTypes {
996 objects: Vec<Object>,
997 inputs: Vec<InputObject>,
998 enums: Vec<Enum>,
999 unions: Vec<Union>,
1000}
1001
1002fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
1003 let record_name = format!("{}Record", cube.name);
1004 let filter_name = format!("{}Filter", cube.name);
1005 let compare_enum_name = format!("{}CompareFields", cube.name);
1006
1007 let flat_dims = cube.flat_dimensions();
1008
1009 let mut compare_enum = Enum::new(&compare_enum_name)
1010 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1011 for (path, _) in &flat_dims {
1012 compare_enum = compare_enum.item(EnumItem::new(path));
1013 }
1014
1015 let mut record_fields: Vec<Field> = Vec::new();
1016 let mut filter_fields: Vec<InputValue> = Vec::new();
1017 let mut extra_objects: Vec<Object> = Vec::new();
1018 let mut extra_inputs: Vec<InputObject> = Vec::new();
1019 let mut extra_unions: Vec<Union> = Vec::new();
1020
1021 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1022 .description("OR combinator — matches if any sub-filter matches"));
1023
1024 {
1025 let mut collector = DimCollector {
1026 cube_name: &cube.name,
1027 compare_enum_name: &compare_enum_name,
1028 filter_name: &filter_name,
1029 record_fields: &mut record_fields,
1030 filter_fields: &mut filter_fields,
1031 extra_objects: &mut extra_objects,
1032 extra_inputs: &mut extra_inputs,
1033 extra_unions: &mut extra_unions,
1034 };
1035 for node in &cube.dimensions {
1036 collect_dimension_types(node, "", &mut collector);
1037 }
1038 }
1039
1040 let mut metric_enums: Vec<Enum> = Vec::new();
1041 let builtin_descs: std::collections::HashMap<&str, &str> = [
1042 ("count", "Count of rows or distinct values"),
1043 ("sum", "Sum of values"),
1044 ("avg", "Average of values"),
1045 ("min", "Minimum value"),
1046 ("max", "Maximum value"),
1047 ("uniq", "Count of unique (distinct) values"),
1048 ].into_iter().collect();
1049
1050 for metric in &cube.metrics {
1051 let metric_name = &metric.name;
1052 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1053
1054 if metric.supports_where {
1055 extra_inputs.push(
1056 InputObject::new(&select_where_name)
1057 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1058 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1059 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1060 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1061 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1062 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1063 );
1064 }
1065
1066 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1067 let mut of_enum = Enum::new(&of_enum_name)
1068 .description(format!("Dimension to apply {} aggregation on", metric_name));
1069 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1070 metric_enums.push(of_enum);
1071
1072 let metric_name_clone = metric_name.clone();
1073 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1074 let metric_desc = metric.description.as_deref()
1075 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1076 .unwrap_or("Aggregate metric");
1077
1078 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1079 let default_name = metric_name_clone.clone();
1080 FieldFuture::new(async move {
1081 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1082 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1083 let key = metric_key(alias);
1084 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1085 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1086 })
1087 })
1088 .description(metric_desc)
1089 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1090 .description("Dimension to aggregate on (default: all rows)"));
1091
1092 if metric_name == "count" {
1093 metric_field = metric_field
1094 .argument(InputValue::new("distinct", TypeRef::named(&of_enum_name))
1095 .description("Count distinct values of this dimension (alias for of, maps to uniqExact)"));
1096 }
1097
1098 if metric.supports_where {
1099 metric_field = metric_field
1100 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1101 .description("Post-aggregation filter (HAVING)"))
1102 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1103 .description("Conditional filter for this metric"));
1104 }
1105
1106 record_fields.push(metric_field);
1107 }
1108
1109 {
1111 let of_enum_name = format!("{}_quantile_Of", cube.name);
1112 let mut of_enum = Enum::new(&of_enum_name)
1113 .description(format!("Dimension to apply quantile on for {}", cube.name));
1114 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1115 metric_enums.push(of_enum);
1116
1117 let of_enum_for_closure = of_enum_name.clone();
1118 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1119 FieldFuture::new(async move {
1120 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1121 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1122 let key = metric_key(alias);
1123 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1124 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1125 })
1126 })
1127 .description("Compute a quantile (percentile) of a dimension")
1128 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1129 .description("Dimension to compute quantile on"))
1130 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1131 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1132 record_fields.push(quantile_field);
1133 }
1134
1135 {
1137 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1138 FieldFuture::new(async move {
1139 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1140 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1141 let key = metric_key(alias);
1142 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1143 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1144 })
1145 })
1146 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1147 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1148 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1149 record_fields.push(calculate_field);
1150 }
1151
1152 for jd in &cube.joins {
1154 let target_record_name = format!("{}Record", jd.target_cube);
1155 let field_name_owned = jd.field_name.clone();
1156 let mut join_field = Field::new(
1157 &jd.field_name,
1158 TypeRef::named(&target_record_name),
1159 move |ctx| {
1160 let field_name = field_name_owned.clone();
1161 FieldFuture::new(async move {
1162 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1163 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1164 let sub_row: RowMap = obj.iter()
1165 .map(|(k, v)| (k.clone(), v.clone()))
1166 .collect();
1167 Ok(Some(FieldValue::owned_any(sub_row)))
1168 } else {
1169 Ok(Some(FieldValue::value(Value::Null)))
1170 }
1171 })
1172 },
1173 );
1174 if let Some(desc) = &jd.description {
1175 join_field = join_field.description(desc);
1176 }
1177 record_fields.push(join_field);
1178 }
1179
1180 let mut record = Object::new(&record_name);
1181 for f in record_fields { record = record.field(f); }
1182
1183 let mut filter = InputObject::new(&filter_name)
1184 .description(format!("Filter conditions for {} query", cube.name));
1185 for f in filter_fields { filter = filter.field(f); }
1186
1187 let orderby_input_name = format!("{}OrderByInput", cube.name);
1188 let orderby_input = InputObject::new(&orderby_input_name)
1189 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1190 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1191 .description("Sort descending by this field"))
1192 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1193 .description("Sort ascending by this field"))
1194 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1195 .description("Sort descending by computed/aggregated field name"))
1196 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1197 .description("Sort ascending by computed/aggregated field name"));
1198
1199 let limitby_input_name = format!("{}LimitByInput", cube.name);
1200 let limitby_input = InputObject::new(&limitby_input_name)
1201 .description(format!("Per-group row limit for {}", cube.name))
1202 .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1203 .description("Dimension field to group by"))
1204 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1205 .description("Maximum rows per group"))
1206 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1207 .description("Rows to skip per group"));
1208
1209 let mut objects = vec![record]; objects.extend(extra_objects);
1210 let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1211 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1212
1213 CubeTypes { objects, inputs, enums, unions: extra_unions }
1214}
1215
1216struct DimCollector<'a> {
1217 cube_name: &'a str,
1218 compare_enum_name: &'a str,
1219 filter_name: &'a str,
1220 record_fields: &'a mut Vec<Field>,
1221 filter_fields: &'a mut Vec<InputValue>,
1222 extra_objects: &'a mut Vec<Object>,
1223 extra_inputs: &'a mut Vec<InputObject>,
1224 extra_unions: &'a mut Vec<Union>,
1225}
1226
1227fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1228 match node {
1229 DimensionNode::Leaf(dim) => {
1230 let col = dim.column.clone();
1231 let is_datetime = dim.dim_type == DimType::DateTime;
1232 let compare_enum = c.compare_enum_name.to_string();
1233 let cube_filter = c.filter_name.to_string();
1234 let mut leaf_field = Field::new(
1235 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1236 move |ctx| {
1237 let col = col.clone();
1238 FieldFuture::new(async move {
1239 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1240 let has_interval = ctx.args.try_get("interval").is_ok();
1241 let has_max = ctx.args.try_get("maximum").is_ok();
1242 let has_min = ctx.args.try_get("minimum").is_ok();
1243 let key = if has_interval || has_max || has_min {
1244 let name = ctx.ctx.field().alias().unwrap_or(&col);
1245 dim_agg_key(name)
1246 } else {
1247 col.clone()
1248 };
1249 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1250 let gql_val = if is_datetime {
1251 json_to_gql_datetime(val)
1252 } else {
1253 json_to_gql_value(val)
1254 };
1255 Ok(Some(FieldValue::value(gql_val)))
1256 })
1257 },
1258 );
1259 if let Some(desc) = &dim.description {
1260 leaf_field = leaf_field.description(desc);
1261 }
1262 leaf_field = leaf_field
1263 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1264 .description("Return value from row where compare field is maximum (argMax)"))
1265 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1266 .description("Return value from row where compare field is minimum (argMin)"))
1267 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1268 .description("Conditional filter for aggregation"))
1269 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1270 .description("Post-aggregation value filter (HAVING)"));
1271 if is_datetime {
1272 leaf_field = leaf_field
1273 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1274 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1275 }
1276 c.record_fields.push(leaf_field);
1279 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1280 }
1281 DimensionNode::Group { graphql_name, description, children } => {
1282 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1283 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1284 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1285
1286 let mut child_record_fields: Vec<Field> = Vec::new();
1287 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1288 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1289
1290 let mut child_collector = DimCollector {
1291 cube_name: c.cube_name,
1292 compare_enum_name: c.compare_enum_name,
1293 filter_name: c.filter_name,
1294 record_fields: &mut child_record_fields,
1295 filter_fields: &mut child_filter_fields,
1296 extra_objects: c.extra_objects,
1297 extra_inputs: c.extra_inputs,
1298 extra_unions: c.extra_unions,
1299 };
1300 for child in children {
1301 collect_dimension_types(child, &new_prefix, &mut child_collector);
1302 }
1303
1304 let mut nested_record = Object::new(&nested_record_name);
1305 for f in child_record_fields { nested_record = nested_record.field(f); }
1306
1307 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1308 let mut nested_filter = InputObject::new(&nested_filter_name)
1309 .description(nested_filter_desc);
1310 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1311
1312 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1313 FieldFuture::new(async move {
1314 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1315 Ok(Some(FieldValue::owned_any(row.clone())))
1316 })
1317 });
1318 if let Some(desc) = description {
1319 group_field = group_field.description(desc);
1320 }
1321 c.record_fields.push(group_field);
1322 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1323 c.extra_objects.push(nested_record);
1324 c.extra_inputs.push(nested_filter);
1325 }
1326 DimensionNode::Array { graphql_name, description, children } => {
1327 let full_path = if prefix.is_empty() {
1328 graphql_name.clone()
1329 } else {
1330 format!("{prefix}_{graphql_name}")
1331 };
1332 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1333 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1334
1335 let mut element_obj = Object::new(&element_type_name);
1336 let mut includes_filter = InputObject::new(&includes_filter_name)
1337 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1338
1339 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1340
1341 for child in children {
1342 match &child.field_type {
1343 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1344 let col_name = child.column.clone();
1345 let is_datetime = *dt == DimType::DateTime;
1346 let mut field = Field::new(
1347 &child.graphql_name,
1348 dim_type_to_typeref(dt),
1349 move |ctx| {
1350 let col = col_name.clone();
1351 FieldFuture::new(async move {
1352 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1353 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1354 let gql_val = if is_datetime {
1355 json_to_gql_datetime(val)
1356 } else {
1357 json_to_gql_value(val)
1358 };
1359 Ok(Some(FieldValue::value(gql_val)))
1360 })
1361 },
1362 );
1363 if let Some(desc) = &child.description {
1364 field = field.description(desc);
1365 }
1366 element_obj = element_obj.field(field);
1367 includes_filter = includes_filter.field(
1368 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1369 );
1370 }
1371 crate::cube::definition::ArrayFieldType::Union(variants) => {
1372 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1373
1374 let mut gql_union = Union::new(&union_name);
1375 let mut variant_objects = Vec::new();
1376
1377 for v in variants {
1378 let variant_obj_name = v.type_name.clone();
1379 let field_name = v.field_name.clone();
1380 let source_type = v.source_type.clone();
1381 let type_ref = dim_type_to_typeref(&source_type);
1382
1383 let val_col = child.column.clone();
1384 let variant_obj = Object::new(&variant_obj_name)
1385 .field(Field::new(
1386 &field_name,
1387 type_ref,
1388 move |ctx| {
1389 let col = val_col.clone();
1390 FieldFuture::new(async move {
1391 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1392 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1393 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1394 })
1395 },
1396 ));
1397 gql_union = gql_union.possible_type(&variant_obj_name);
1398 variant_objects.push(variant_obj);
1399 }
1400
1401 let col_name = child.column.clone();
1402 let type_col = children.iter()
1403 .find(|f| f.graphql_name == "Type")
1404 .map(|f| f.column.clone())
1405 .unwrap_or_default();
1406 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1407
1408 let mut field = Field::new(
1409 &child.graphql_name,
1410 TypeRef::named(&union_name),
1411 move |ctx| {
1412 let col = col_name.clone();
1413 let tcol = type_col.clone();
1414 let vars = variants_clone.clone();
1415 FieldFuture::new(async move {
1416 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1417 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1418 if raw_val.is_null() || raw_val.as_str() == Some("") {
1419 return Ok(None);
1420 }
1421 let type_str = row.get(&tcol)
1422 .and_then(|v| v.as_str())
1423 .unwrap_or("");
1424 let resolved_type = resolve_union_typename(type_str, &vars);
1425 let mut elem = RowMap::new();
1426 elem.insert(col.clone(), raw_val);
1427 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1428 })
1429 },
1430 );
1431 if let Some(desc) = &child.description {
1432 field = field.description(desc);
1433 }
1434 element_obj = element_obj.field(field);
1435
1436 includes_filter = includes_filter.field(
1437 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1438 );
1439
1440 union_registrations.push((union_name, gql_union, variant_objects));
1441 }
1442 }
1443 }
1444
1445 for (_, union_type, variant_objs) in union_registrations {
1447 c.extra_objects.extend(variant_objs);
1448 c.extra_unions.push(union_type);
1456 }
1457
1458 let child_columns: Vec<(String, String)> = children.iter()
1460 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1461 .collect();
1462 let element_type_name_clone = element_type_name.clone();
1463
1464 let mut array_field = Field::new(
1465 graphql_name,
1466 TypeRef::named_nn_list_nn(&element_type_name),
1467 move |ctx| {
1468 let cols = child_columns.clone();
1469 let _etype = element_type_name_clone.clone();
1470 FieldFuture::new(async move {
1471 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1472 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1473 .map(|(gql_name, col)| {
1474 let arr = row.get(col)
1475 .and_then(|v| v.as_array())
1476 .cloned()
1477 .unwrap_or_default();
1478 (gql_name.as_str(), arr)
1479 })
1480 .collect();
1481
1482 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1483 let mut elements = Vec::with_capacity(len);
1484 for i in 0..len {
1485 let mut elem = RowMap::new();
1486 for (gql_name, arr) in &arrays {
1487 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1488 elem.insert(gql_name.to_string(), val);
1489 }
1490 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1492 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1493 elem.insert(col.clone(), val);
1494 }
1495 elements.push(FieldValue::owned_any(elem));
1496 }
1497 Ok(Some(FieldValue::list(elements)))
1498 })
1499 },
1500 );
1501 if let Some(desc) = description {
1502 array_field = array_field.description(desc);
1503 }
1504 c.record_fields.push(array_field);
1505
1506 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1510 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1511 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1512 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1513 .description("Match rows where at least one array element satisfies all conditions"));
1514 c.extra_inputs.push(wrapper_filter);
1515
1516 c.filter_fields.push(InputValue::new(
1517 graphql_name,
1518 TypeRef::named(&wrapper_filter_name),
1519 ));
1520
1521 c.extra_objects.push(element_obj);
1522 c.extra_inputs.push(includes_filter);
1523 }
1524 }
1525}
1526
1527fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1533 for v in variants {
1534 if v.source_type_names.iter().any(|s| s == type_str) {
1535 return v.type_name.clone();
1536 }
1537 }
1538 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1539}
1540
1541fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1542 match dt {
1543 DimType::String | DimType::Decimal | DimType::Date => TypeRef::named(TypeRef::STRING),
1544 DimType::DateTime => TypeRef::named("DateTime"),
1545 DimType::Int => TypeRef::named(TypeRef::INT),
1546 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1547 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1548 }
1549}
1550
1551fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1552 match dt {
1553 DimType::String => "StringFilter",
1554 DimType::Int => "IntFilter",
1555 DimType::Float => "FloatFilter",
1556 DimType::Decimal => "DecimalFilter",
1557 DimType::Date => "DateFilter",
1558 DimType::DateTime => "DateTimeFilter",
1559 DimType::Bool => "BoolFilter",
1560 }
1561}
1562
1563pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1564 match v {
1565 serde_json::Value::Null => Value::Null,
1566 serde_json::Value::Bool(b) => Value::from(b),
1567 serde_json::Value::Number(n) => {
1568 if let Some(i) = n.as_i64() { Value::from(i) }
1569 else if let Some(f) = n.as_f64() { Value::from(f) }
1570 else { Value::from(n.to_string()) }
1571 }
1572 serde_json::Value::String(s) => Value::from(s),
1573 _ => Value::from(v.to_string()),
1574 }
1575}
1576
1577fn build_join_expr(
1580 jd: &crate::cube::definition::JoinDef,
1581 target_cube: &CubeDefinition,
1582 sub_field: &async_graphql::SelectionField<'_>,
1583 network: &str,
1584 join_idx: usize,
1585) -> JoinExpr {
1586 let target_flat = target_cube.flat_dimensions();
1587 let target_table = target_cube.table_for_chain(network);
1588
1589 let mut requested_paths = HashSet::new();
1590 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1591
1592 let mut selects: Vec<SelectExpr> = target_flat.iter()
1593 .filter(|(path, _)| requested_paths.contains(path))
1594 .map(|(_, dim)| SelectExpr::Column {
1595 column: dim.column.clone(),
1596 alias: None,
1597 })
1598 .collect();
1599
1600 if selects.is_empty() {
1601 selects = target_flat.iter()
1602 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1603 .collect();
1604 }
1605
1606 let is_aggregate = target_flat.iter().any(|(_, dim)| is_aggregate_expr(&dim.column));
1607
1608 let group_by = if is_aggregate {
1609 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1610 for sel in &selects {
1611 if let SelectExpr::Column { column, .. } = sel {
1612 if !is_aggregate_expr(column) && !gb.contains(column) {
1613 gb.push(column.clone());
1614 }
1615 }
1616 }
1617 gb
1618 } else {
1619 vec![]
1620 };
1621
1622 JoinExpr {
1623 schema: target_cube.schema.clone(),
1624 table: target_table,
1625 alias: format!("_j{}", join_idx),
1626 conditions: jd.conditions.clone(),
1627 selects,
1628 group_by,
1629 use_final: target_cube.use_final,
1630 is_aggregate,
1631 target_cube: jd.target_cube.clone(),
1632 join_field: sub_field.name().to_string(),
1633 join_type: jd.join_type.clone(),
1634 }
1635}
1636
1637fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1640 match v {
1641 serde_json::Value::String(s) => {
1642 let iso = if s.contains('T') {
1643 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1644 } else {
1645 let replaced = s.replacen(' ', "T", 1);
1646 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1647 };
1648 Value::from(iso)
1649 }
1650 other => json_to_gql_value(other),
1651 }
1652}