1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59}
60
61pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66pub struct SchemaConfig {
68 pub chain_groups: Vec<ChainGroupConfig>,
69 pub root_query_name: String,
70 pub stats_callback: Option<StatsCallback>,
73 pub table_name_transform: Option<TableNameTransform>,
75 pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80 fn default() -> Self {
81 Self {
82 chain_groups: vec![
83 ChainGroupConfig {
84 name: "EVM".to_string(),
85 group: ChainGroup::Evm,
86 networks: vec!["eth".into(), "bsc".into()],
87 has_network_arg: true,
88 extra_args: vec![],
89 },
90 ChainGroupConfig {
91 name: "Solana".to_string(),
92 group: ChainGroup::Solana,
93 networks: vec!["sol".into()],
94 has_network_arg: false,
95 extra_args: vec![],
96 },
97 ChainGroupConfig {
98 name: "Trading".to_string(),
99 group: ChainGroup::Trading,
100 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101 has_network_arg: false,
102 extra_args: vec![],
103 },
104 ],
105 root_query_name: "ChainStream".to_string(),
106 stats_callback: None,
107 table_name_transform: None,
108 extra_types: vec![],
109 }
110 }
111}
112
113pub struct ChainContext {
117 pub network: String,
118 pub extra: HashMap<String, String>,
119}
120
121pub fn build_schema(
133 registry: CubeRegistry,
134 dialect: Arc<dyn SqlDialect>,
135 executor: QueryExecutor,
136 config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138 let mut builder = Schema::build("Query", None, None);
139
140 builder = builder.register(filter_types::build_limit_input());
142 builder = builder.register(
144 Enum::new("OrderDirection")
145 .description("Sort direction")
146 .item(EnumItem::new("ASC").description("Ascending"))
147 .item(EnumItem::new("DESC").description("Descending")),
148 );
149 for input in filter_types::build_filter_primitives() {
150 builder = builder.register(input);
151 }
152 builder = builder.register(
153 Enum::new("TimeUnit")
154 .description("Time unit for interval bucketing")
155 .item(EnumItem::new("seconds"))
156 .item(EnumItem::new("minutes"))
157 .item(EnumItem::new("hours"))
158 .item(EnumItem::new("days"))
159 .item(EnumItem::new("weeks"))
160 .item(EnumItem::new("months")),
161 );
162 builder = builder.register(
163 InputObject::new("TimeIntervalInput")
164 .description("Time bucketing interval for DateTime dimensions")
165 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
166 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
167 );
168 builder = builder.register(
169 InputObject::new("DimSelectWhere")
170 .description("Post-aggregation filter for dimension values (HAVING clause)")
171 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
172 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
173 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
174 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
175 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
176 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
177 );
178
179 for extra_enum in config.extra_types {
180 builder = builder.register(extra_enum);
181 }
182
183 for grp in &config.chain_groups {
185 if grp.has_network_arg {
186 let enum_name = format!("{}Network", grp.name);
187 let mut net_enum = Enum::new(&enum_name)
188 .description(format!("{} network selector", grp.name));
189 for net in &grp.networks {
190 net_enum = net_enum.item(EnumItem::new(net));
191 }
192 builder = builder.register(net_enum);
193 }
194 }
195
196 let mut registered_cubes: HashSet<String> = HashSet::new();
198 for cube in registry.cubes() {
199 if registered_cubes.contains(&cube.name) { continue; }
200 registered_cubes.insert(cube.name.clone());
201
202 let types = build_cube_types(cube);
203 for obj in types.objects { builder = builder.register(obj); }
204 for inp in types.inputs { builder = builder.register(inp); }
205 for en in types.enums { builder = builder.register(en); }
206 for un in types.unions { builder = builder.register(un); }
207 }
208
209 let mut query = Object::new("Query");
211
212 for grp in &config.chain_groups {
213 let wrapper_type_name = grp.name.clone();
214
215 let mut wrapper_obj = Object::new(&wrapper_type_name);
218
219 for cube in registry.cubes() {
220 if !cube.chain_groups.contains(&grp.group) { continue; }
221
222 let cube_field = build_cube_field(
223 cube,
224 dialect.clone(),
225 executor.clone(),
226 config.stats_callback.clone(),
227 );
228 wrapper_obj = wrapper_obj.field(cube_field);
229 }
230 builder = builder.register(wrapper_obj);
231
232 let default_network = grp.networks.first().cloned().unwrap_or_default();
234 let has_net_arg = grp.has_network_arg;
235 let net_enum_name = format!("{}Network", grp.name);
236 let wrapper_type_for_resolver = wrapper_type_name.clone();
237 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
238
239 let mut wrapper_field = Field::new(
240 &wrapper_type_name,
241 TypeRef::named_nn(&wrapper_type_name),
242 move |ctx| {
243 let default = default_network.clone();
244 let has_arg = has_net_arg;
245 let arg_names = extra_arg_names.clone();
246 FieldFuture::new(async move {
247 let network = if has_arg {
248 ctx.args.try_get("network")
249 .ok()
250 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
251 .unwrap_or(default)
252 } else {
253 default
254 };
255 let mut extra = HashMap::new();
256 for arg_name in &arg_names {
257 if let Ok(val) = ctx.args.try_get(arg_name) {
258 let resolved = val.enum_name().ok().map(|s| s.to_string())
259 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
260 .or_else(|| val.string().ok().map(|s| s.to_string()));
261 if let Some(v) = resolved {
262 extra.insert(arg_name.clone(), v);
263 }
264 }
265 }
266 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
267 })
268 },
269 );
270
271 if grp.has_network_arg {
272 wrapper_field = wrapper_field.argument(
273 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
274 .description(format!("{} network to query", wrapper_type_for_resolver)),
275 );
276 }
277 for wa in &grp.extra_args {
278 wrapper_field = wrapper_field.argument(
279 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
280 .description(&wa.description),
281 );
282 }
283
284 query = query.field(wrapper_field);
285 }
286
287 let metadata_registry = Arc::new(registry.clone());
289 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
290 .map(|g| (g.name.clone(), g.group.clone()))
291 .collect();
292 let metadata_field = Field::new(
293 "_cubeMetadata",
294 TypeRef::named_nn(TypeRef::STRING),
295 move |_ctx| {
296 let reg = metadata_registry.clone();
297 let groups = metadata_groups.clone();
298 FieldFuture::new(async move {
299 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
300 for (group_name, group_enum) in &groups {
301 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
302 .filter(|c| c.chain_groups.contains(group_enum))
303 .map(serialize_cube_metadata)
304 .collect();
305 group_metadata.push(serde_json::json!({
306 "group": group_name,
307 "cubes": cubes_in_group,
308 }));
309 }
310 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
311 Ok(Some(FieldValue::value(Value::from(json))))
312 })
313 },
314 )
315 .description("Internal: returns JSON metadata about all cubes grouped by chain");
316 query = query.field(metadata_field);
317
318 builder = builder.register(query);
319 builder = builder.data(registry);
320
321 if let Some(transform) = config.table_name_transform.clone() {
322 builder = builder.data(transform);
323 }
324
325 builder.finish()
326}
327
328fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
329 serde_json::json!({
330 "name": cube.name,
331 "description": cube.description,
332 "schema": cube.schema,
333 "tablePattern": cube.table_pattern,
334 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
335 "metrics": cube.metrics.iter().map(|m| {
336 let mut obj = serde_json::json!({
337 "name": m.name,
338 "returnType": format!("{:?}", m.return_type),
339 });
340 if let Some(ref tmpl) = m.expression_template {
341 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
342 }
343 if let Some(ref desc) = m.description {
344 obj["description"] = serde_json::Value::String(desc.clone());
345 }
346 obj
347 }).collect::<Vec<_>>(),
348 "selectors": cube.selectors.iter().map(|s| {
349 serde_json::json!({
350 "name": s.graphql_name,
351 "column": s.column,
352 "type": format!("{:?}", s.dim_type),
353 })
354 }).collect::<Vec<_>>(),
355 "dimensions": serialize_dims(&cube.dimensions),
356 "joins": cube.joins.iter().map(|j| {
357 serde_json::json!({
358 "field": j.field_name,
359 "target": j.target_cube,
360 "joinType": format!("{:?}", j.join_type),
361 })
362 }).collect::<Vec<_>>(),
363 "tableRoutes": cube.table_routes.iter().map(|r| {
364 serde_json::json!({
365 "schema": r.schema,
366 "tablePattern": r.table_pattern,
367 "availableColumns": r.available_columns,
368 "priority": r.priority,
369 })
370 }).collect::<Vec<_>>(),
371 "defaultLimit": cube.default_limit,
372 "maxLimit": cube.max_limit,
373 })
374}
375
376fn build_cube_field(
378 cube: &CubeDefinition,
379 dialect: Arc<dyn SqlDialect>,
380 executor: QueryExecutor,
381 stats_cb: Option<StatsCallback>,
382) -> Field {
383 let cube_name = cube.name.clone();
384 let orderby_input_name = format!("{}OrderByInput", cube.name);
385 let cube_description = cube.description.clone();
386
387 let mut field = Field::new(
388 &cube.name,
389 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
390 move |ctx| {
391 let cube_name = cube_name.clone();
392 let dialect = dialect.clone();
393 let executor = executor.clone();
394 let stats_cb = stats_cb.clone();
395 FieldFuture::new(async move {
396 let registry = ctx.ctx.data::<CubeRegistry>()?;
397
398 let chain_ctx = ctx.parent_value
399 .try_downcast_ref::<ChainContext>().ok();
400 let network = chain_ctx
401 .map(|c| c.network.as_str())
402 .unwrap_or("sol");
403
404 let cube_def = registry.get(&cube_name).ok_or_else(|| {
405 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
406 })?;
407
408 let metric_requests = extract_metric_requests(&ctx, cube_def);
409 let quantile_requests = extract_quantile_requests(&ctx);
410 let calculate_requests = extract_calculate_requests(&ctx);
411 let field_aliases = extract_field_aliases(&ctx, cube_def);
412 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
413 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
414 let requested = extract_requested_fields(&ctx, cube_def);
415 let mut ir = compiler::parser::parse_cube_query(
416 cube_def,
417 network,
418 &ctx.args,
419 &metric_requests,
420 &quantile_requests,
421 &calculate_requests,
422 &field_aliases,
423 &dim_agg_requests,
424 &time_intervals,
425 Some(requested),
426 )?;
427
428 let mut join_idx = 0usize;
429 for sub_field in ctx.ctx.field().selection_set() {
430 let fname = sub_field.name().to_string();
431 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
432 if let Some(jd) = join_def {
433 if let Some(target_cube) = registry.get(&jd.target_cube) {
434 let join_expr = build_join_expr(
435 jd, target_cube, &sub_field, network, join_idx,
436 );
437 ir.joins.push(join_expr);
438 join_idx += 1;
439 }
440 }
441 }
442
443 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
444 if let Some(cctx) = chain_ctx {
445 ir.table = transform(&ir.table, cctx);
446 }
447 }
448
449 let validated = compiler::validator::validate(ir)?;
450 let result = dialect.compile(&validated);
451 let sql = result.sql;
452 let bindings = result.bindings;
453
454 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
455 async_graphql::Error::new(format!("Query execution failed: {e}"))
456 })?;
457
458 let rows = if result.alias_remap.is_empty() {
459 rows
460 } else {
461 rows.into_iter().map(|mut row| {
462 for (alias, original) in &result.alias_remap {
463 if let Some(val) = row.shift_remove(alias) {
464 row.entry(original.clone()).or_insert(val);
465 }
466 }
467 row
468 }).collect()
469 };
470
471 let rows: Vec<RowMap> = if validated.joins.is_empty() {
472 rows
473 } else {
474 rows.into_iter().map(|mut row| {
475 for join in &validated.joins {
476 let prefix = format!("{}.", join.alias);
477 let mut sub_row = RowMap::new();
478 let keys: Vec<String> = row.keys()
479 .filter(|k| k.starts_with(&prefix))
480 .cloned()
481 .collect();
482 for key in keys {
483 if let Some(val) = row.shift_remove(&key) {
484 sub_row.insert(key[prefix.len()..].to_string(), val);
485 }
486 }
487 let obj: serde_json::Map<String, serde_json::Value> =
488 sub_row.into_iter().collect();
489 row.insert(
490 join.join_field.clone(),
491 serde_json::Value::Object(obj),
492 );
493 }
494 row
495 }).collect()
496 };
497
498 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
499 .or_else(|| stats_cb.clone());
500 if let Some(cb) = effective_cb {
501 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
502 cb(stats);
503 }
504
505 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
506 Ok(Some(FieldValue::list(values)))
507 })
508 },
509 );
510 if !cube_description.is_empty() {
511 field = field.description(&cube_description);
512 }
513 field = field
514 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
515 .description("Filter conditions"))
516 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
517 .description("Pagination control"))
518 .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
519 .description("Per-group row limit"))
520 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
521 .description("Sort order (Bitquery-compatible)"));
522
523 for sel in &cube.selectors {
524 let filter_type = dim_type_to_filter_name(&sel.dim_type);
525 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
526 .description(format!("Shorthand filter for {}", sel.graphql_name)));
527 }
528
529 field
530}
531
532fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
533 serde_json::Value::Array(dims.iter().map(|d| match d {
534 DimensionNode::Leaf(dim) => {
535 let mut obj = serde_json::json!({
536 "name": dim.graphql_name,
537 "column": dim.column,
538 "type": format!("{:?}", dim.dim_type),
539 });
540 if let Some(desc) = &dim.description {
541 obj["description"] = serde_json::Value::String(desc.clone());
542 }
543 obj
544 },
545 DimensionNode::Group { graphql_name, description, children } => {
546 let mut obj = serde_json::json!({
547 "name": graphql_name,
548 "children": serialize_dims(children),
549 });
550 if let Some(desc) = description {
551 obj["description"] = serde_json::Value::String(desc.clone());
552 }
553 obj
554 },
555 DimensionNode::Array { graphql_name, description, children } => {
556 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
557 let type_value = match &f.field_type {
558 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
559 "kind": "scalar",
560 "scalarType": format!("{:?}", dt),
561 }),
562 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
563 "kind": "union",
564 "variants": variants.iter().map(|v| serde_json::json!({
565 "typeName": v.type_name,
566 "fieldName": v.field_name,
567 "sourceType": format!("{:?}", v.source_type),
568 })).collect::<Vec<_>>(),
569 }),
570 };
571 let mut field_obj = serde_json::json!({
572 "name": f.graphql_name,
573 "column": f.column,
574 "type": type_value,
575 });
576 if let Some(desc) = &f.description {
577 field_obj["description"] = serde_json::Value::String(desc.clone());
578 }
579 field_obj
580 }).collect();
581 let mut obj = serde_json::json!({
582 "name": graphql_name,
583 "kind": "array",
584 "fields": fields,
585 });
586 if let Some(desc) = description {
587 obj["description"] = serde_json::Value::String(desc.clone());
588 }
589 obj
590 },
591 }).collect())
592}
593
594fn extract_metric_requests(
598 ctx: &async_graphql::dynamic::ResolverContext,
599 cube: &CubeDefinition,
600) -> Vec<compiler::parser::MetricRequest> {
601 let mut requests = Vec::new();
602
603 for sub_field in ctx.ctx.field().selection_set() {
604 let name = sub_field.name();
605 if !cube.has_metric(name) {
606 continue;
607 }
608
609 let args = match sub_field.arguments() {
610 Ok(args) => args,
611 Err(_) => continue,
612 };
613
614 let of_dimension = args
615 .iter()
616 .find(|(k, _)| k.as_str() == "of")
617 .and_then(|(_, v)| match v {
618 async_graphql::Value::Enum(e) => Some(e.to_string()),
619 async_graphql::Value::String(s) => Some(s.clone()),
620 _ => None,
621 })
622 .unwrap_or_else(|| "*".to_string());
623
624 let select_where_value = args
625 .iter()
626 .find(|(k, _)| k.as_str() == "selectWhere")
627 .map(|(_, v)| v.clone());
628
629 let condition_filter = args
630 .iter()
631 .find(|(k, _)| k.as_str() == "if")
632 .and_then(|(_, v)| {
633 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
634 .and_then(|f| if f.is_empty() { None } else { Some(f) })
635 });
636
637 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
638 requests.push(compiler::parser::MetricRequest {
639 function: name.to_string(),
640 alias: gql_alias,
641 of_dimension,
642 select_where_value,
643 condition_filter,
644 });
645 }
646
647 requests
648}
649
650fn extract_requested_fields(
651 ctx: &async_graphql::dynamic::ResolverContext,
652 cube: &CubeDefinition,
653) -> HashSet<String> {
654 let mut fields = HashSet::new();
655 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
656 fields
657}
658
659fn collect_selection_paths(
660 field: &async_graphql::SelectionField<'_>,
661 prefix: &str,
662 out: &mut HashSet<String>,
663 metrics: &[MetricDef],
664) {
665 for sub in field.selection_set() {
666 let name = sub.name();
667 if metrics.iter().any(|m| m.name == name) {
668 continue;
669 }
670 let path = if prefix.is_empty() {
671 name.to_string()
672 } else {
673 format!("{prefix}_{name}")
674 };
675 let has_children = sub.selection_set().next().is_some();
676 if has_children {
677 collect_selection_paths(&sub, &path, out, metrics);
678 } else {
679 out.insert(path);
680 }
681 }
682}
683
684pub type FieldAliasMap = Vec<(String, String)>;
687
688pub struct QuantileRequest {
690 pub alias: String,
691 pub of_dimension: String,
692 pub level: f64,
693}
694
695pub struct CalculateRequest {
697 pub alias: String,
698 pub expression: String,
699}
700
701pub struct TimeIntervalRequest {
704 pub field_path: String,
705 pub graphql_alias: String,
706 pub column: String,
707 pub unit: String,
708 pub count: i64,
709}
710
711pub struct DimAggRequest {
714 pub field_path: String,
715 pub graphql_alias: String,
716 pub value_column: String,
717 pub agg_type: DimAggType,
718 pub compare_column: String,
719 pub condition_filter: Option<FilterNode>,
720 pub select_where_value: Option<async_graphql::Value>,
721}
722
723fn extract_quantile_requests(
724 ctx: &async_graphql::dynamic::ResolverContext,
725) -> Vec<QuantileRequest> {
726 let mut requests = Vec::new();
727 for sub_field in ctx.ctx.field().selection_set() {
728 if sub_field.name() != "quantile" { continue; }
729 let args = match sub_field.arguments() {
730 Ok(a) => a,
731 Err(_) => continue,
732 };
733 let of_dim = args.iter()
734 .find(|(k, _)| k.as_str() == "of")
735 .and_then(|(_, v)| match v {
736 async_graphql::Value::Enum(e) => Some(e.to_string()),
737 async_graphql::Value::String(s) => Some(s.clone()),
738 _ => None,
739 })
740 .unwrap_or_else(|| "*".to_string());
741 let level = args.iter()
742 .find(|(k, _)| k.as_str() == "level")
743 .and_then(|(_, v)| match v {
744 async_graphql::Value::Number(n) => n.as_f64(),
745 _ => None,
746 })
747 .unwrap_or(0.5);
748 let alias = sub_field.alias().unwrap_or("quantile").to_string();
749 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
750 }
751 requests
752}
753
754fn extract_field_aliases(
755 ctx: &async_graphql::dynamic::ResolverContext,
756 cube: &CubeDefinition,
757) -> FieldAliasMap {
758 let flat = cube.flat_dimensions();
759 let mut aliases = Vec::new();
760 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
761 aliases
762}
763
764fn collect_field_aliases(
765 field: &async_graphql::SelectionField<'_>,
766 prefix: &str,
767 flat: &[(String, crate::cube::definition::Dimension)],
768 metrics: &[MetricDef],
769 out: &mut FieldAliasMap,
770) {
771 for sub in field.selection_set() {
772 let name = sub.name();
773 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
774 continue;
775 }
776 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
777 let has_children = sub.selection_set().next().is_some();
778 if has_children {
779 collect_field_aliases(&sub, &path, flat, metrics, out);
780 } else if let Some(alias) = sub.alias() {
781 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
782 let alias_path = if prefix.is_empty() {
783 alias.to_string()
784 } else {
785 format!("{prefix}_{alias}")
786 };
787 out.push((alias_path, dim.column.clone()));
788 }
789 }
790 }
791}
792
793fn extract_calculate_requests(
794 ctx: &async_graphql::dynamic::ResolverContext,
795) -> Vec<CalculateRequest> {
796 let mut requests = Vec::new();
797 for sub_field in ctx.ctx.field().selection_set() {
798 if sub_field.name() != "calculate" { continue; }
799 let args = match sub_field.arguments() {
800 Ok(a) => a,
801 Err(_) => continue,
802 };
803 let expression = args.iter()
804 .find(|(k, _)| k.as_str() == "expression")
805 .and_then(|(_, v)| match v {
806 async_graphql::Value::String(s) => Some(s.clone()),
807 _ => None,
808 });
809 if let Some(expr) = expression {
810 let alias = sub_field.alias().unwrap_or("calculate").to_string();
811 requests.push(CalculateRequest { alias, expression: expr });
812 }
813 }
814 requests
815}
816
817fn extract_dim_agg_requests(
818 ctx: &async_graphql::dynamic::ResolverContext,
819 cube: &CubeDefinition,
820) -> Vec<DimAggRequest> {
821 let flat = cube.flat_dimensions();
822 let mut requests = Vec::new();
823 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
824 requests
825}
826
827fn collect_dim_agg_paths(
828 field: &async_graphql::SelectionField<'_>,
829 prefix: &str,
830 flat: &[(String, crate::cube::definition::Dimension)],
831 metrics: &[MetricDef],
832 cube: &CubeDefinition,
833 out: &mut Vec<DimAggRequest>,
834) {
835 for sub in field.selection_set() {
836 let name = sub.name();
837 if metrics.iter().any(|m| m.name == name) {
838 continue;
839 }
840 let path = if prefix.is_empty() {
841 name.to_string()
842 } else {
843 format!("{prefix}_{name}")
844 };
845 let has_children = sub.selection_set().next().is_some();
846 if has_children {
847 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
848 } else {
849 let args = match sub.arguments() {
850 Ok(a) => a,
851 Err(_) => continue,
852 };
853 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
854 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
855
856 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
857 let cp = match v {
858 async_graphql::Value::Enum(e) => e.to_string(),
859 async_graphql::Value::String(s) => s.clone(),
860 _ => continue,
861 };
862 (DimAggType::ArgMax, cp)
863 } else if let Some((_, v)) = min_val {
864 let cp = match v {
865 async_graphql::Value::Enum(e) => e.to_string(),
866 async_graphql::Value::String(s) => s.clone(),
867 _ => continue,
868 };
869 (DimAggType::ArgMin, cp)
870 } else {
871 continue;
872 };
873
874 let value_column = flat.iter()
875 .find(|(p, _)| p == &path)
876 .map(|(_, dim)| dim.column.clone());
877 let compare_column = flat.iter()
878 .find(|(p, _)| p == &compare_path)
879 .map(|(_, dim)| dim.column.clone());
880
881 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
882 let condition_filter = args.iter()
883 .find(|(k, _)| k.as_str() == "if")
884 .and_then(|(_, v)| {
885 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
886 .and_then(|f| if f.is_empty() { None } else { Some(f) })
887 });
888 let select_where_value = args.iter()
889 .find(|(k, _)| k.as_str() == "selectWhere")
890 .map(|(_, v)| v.clone());
891
892 let gql_alias = sub.alias()
893 .map(|a| a.to_string())
894 .unwrap_or_else(|| path.clone());
895 out.push(DimAggRequest {
896 field_path: path,
897 graphql_alias: gql_alias,
898 value_column: vc,
899 agg_type,
900 compare_column: cc,
901 condition_filter,
902 select_where_value,
903 });
904 }
905 }
906 }
907}
908
909fn extract_time_interval_requests(
910 ctx: &async_graphql::dynamic::ResolverContext,
911 cube: &CubeDefinition,
912) -> Vec<TimeIntervalRequest> {
913 let flat = cube.flat_dimensions();
914 let mut requests = Vec::new();
915 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
916 requests
917}
918
919fn collect_time_interval_paths(
920 field: &async_graphql::SelectionField<'_>,
921 prefix: &str,
922 flat: &[(String, crate::cube::definition::Dimension)],
923 metrics: &[MetricDef],
924 out: &mut Vec<TimeIntervalRequest>,
925) {
926 for sub in field.selection_set() {
927 let name = sub.name();
928 if metrics.iter().any(|m| m.name == name) { continue; }
929 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
930 let has_children = sub.selection_set().next().is_some();
931 if has_children {
932 collect_time_interval_paths(&sub, &path, flat, metrics, out);
933 } else {
934 let args = match sub.arguments() {
935 Ok(a) => a,
936 Err(_) => continue,
937 };
938 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
939 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
940 let unit = obj.get("in")
941 .and_then(|v| match v {
942 async_graphql::Value::Enum(e) => Some(e.to_string()),
943 async_graphql::Value::String(s) => Some(s.clone()),
944 _ => None,
945 });
946 let count = obj.get("count")
947 .and_then(|v| match v {
948 async_graphql::Value::Number(n) => n.as_i64(),
949 _ => None,
950 });
951 if let (Some(unit), Some(count)) = (unit, count) {
952 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
953 let gql_alias = sub.alias().unwrap_or(name).to_string();
954 out.push(TimeIntervalRequest {
955 field_path: path,
956 graphql_alias: gql_alias,
957 column: dim.column.clone(),
958 unit,
959 count,
960 });
961 }
962 }
963 }
964 }
965 }
966}
967
968struct CubeTypes {
973 objects: Vec<Object>,
974 inputs: Vec<InputObject>,
975 enums: Vec<Enum>,
976 unions: Vec<Union>,
977}
978
979fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
980 let record_name = format!("{}Record", cube.name);
981 let filter_name = format!("{}Filter", cube.name);
982 let compare_enum_name = format!("{}CompareFields", cube.name);
983
984 let flat_dims = cube.flat_dimensions();
985
986 let mut compare_enum = Enum::new(&compare_enum_name)
987 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
988 for (path, _) in &flat_dims {
989 compare_enum = compare_enum.item(EnumItem::new(path));
990 }
991
992 let mut record_fields: Vec<Field> = Vec::new();
993 let mut filter_fields: Vec<InputValue> = Vec::new();
994 let mut extra_objects: Vec<Object> = Vec::new();
995 let mut extra_inputs: Vec<InputObject> = Vec::new();
996 let mut extra_unions: Vec<Union> = Vec::new();
997
998 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
999 .description("OR combinator — matches if any sub-filter matches"));
1000
1001 {
1002 let mut collector = DimCollector {
1003 cube_name: &cube.name,
1004 compare_enum_name: &compare_enum_name,
1005 filter_name: &filter_name,
1006 record_fields: &mut record_fields,
1007 filter_fields: &mut filter_fields,
1008 extra_objects: &mut extra_objects,
1009 extra_inputs: &mut extra_inputs,
1010 extra_unions: &mut extra_unions,
1011 };
1012 for node in &cube.dimensions {
1013 collect_dimension_types(node, "", &mut collector);
1014 }
1015 }
1016
1017 let mut metric_enums: Vec<Enum> = Vec::new();
1018 let builtin_descs: std::collections::HashMap<&str, &str> = [
1019 ("count", "Count of rows or distinct values"),
1020 ("sum", "Sum of values"),
1021 ("avg", "Average of values"),
1022 ("min", "Minimum value"),
1023 ("max", "Maximum value"),
1024 ("uniq", "Count of unique (distinct) values"),
1025 ].into_iter().collect();
1026
1027 for metric in &cube.metrics {
1028 let metric_name = &metric.name;
1029 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1030
1031 if metric.supports_where {
1032 extra_inputs.push(
1033 InputObject::new(&select_where_name)
1034 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1035 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1036 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1037 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1038 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1039 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1040 );
1041 }
1042
1043 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1044 let mut of_enum = Enum::new(&of_enum_name)
1045 .description(format!("Dimension to apply {} aggregation on", metric_name));
1046 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1047 metric_enums.push(of_enum);
1048
1049 let metric_name_clone = metric_name.clone();
1050 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1051 let metric_desc = metric.description.as_deref()
1052 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1053 .unwrap_or("Aggregate metric");
1054
1055 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1056 let default_name = metric_name_clone.clone();
1057 FieldFuture::new(async move {
1058 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1059 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1060 let key = metric_key(alias);
1061 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1062 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1063 })
1064 })
1065 .description(metric_desc)
1066 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1067 .description("Dimension to aggregate on (default: all rows)"));
1068
1069 if metric.supports_where {
1070 metric_field = metric_field
1071 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1072 .description("Post-aggregation filter (HAVING)"))
1073 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1074 .description("Conditional filter for this metric"));
1075 }
1076
1077 record_fields.push(metric_field);
1078 }
1079
1080 {
1082 let of_enum_name = format!("{}_quantile_Of", cube.name);
1083 let mut of_enum = Enum::new(&of_enum_name)
1084 .description(format!("Dimension to apply quantile on for {}", cube.name));
1085 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1086 metric_enums.push(of_enum);
1087
1088 let of_enum_for_closure = of_enum_name.clone();
1089 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1090 FieldFuture::new(async move {
1091 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1092 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1093 let key = metric_key(alias);
1094 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1095 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1096 })
1097 })
1098 .description("Compute a quantile (percentile) of a dimension")
1099 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1100 .description("Dimension to compute quantile on"))
1101 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1102 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1103 record_fields.push(quantile_field);
1104 }
1105
1106 {
1108 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1109 FieldFuture::new(async move {
1110 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1111 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1112 let key = metric_key(alias);
1113 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1114 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1115 })
1116 })
1117 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1118 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1119 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1120 record_fields.push(calculate_field);
1121 }
1122
1123 for jd in &cube.joins {
1125 let target_record_name = format!("{}Record", jd.target_cube);
1126 let field_name_owned = jd.field_name.clone();
1127 let mut join_field = Field::new(
1128 &jd.field_name,
1129 TypeRef::named(&target_record_name),
1130 move |ctx| {
1131 let field_name = field_name_owned.clone();
1132 FieldFuture::new(async move {
1133 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1134 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1135 let sub_row: RowMap = obj.iter()
1136 .map(|(k, v)| (k.clone(), v.clone()))
1137 .collect();
1138 Ok(Some(FieldValue::owned_any(sub_row)))
1139 } else {
1140 Ok(Some(FieldValue::value(Value::Null)))
1141 }
1142 })
1143 },
1144 );
1145 if let Some(desc) = &jd.description {
1146 join_field = join_field.description(desc);
1147 }
1148 record_fields.push(join_field);
1149 }
1150
1151 let mut record = Object::new(&record_name);
1152 for f in record_fields { record = record.field(f); }
1153
1154 let mut filter = InputObject::new(&filter_name)
1155 .description(format!("Filter conditions for {} query", cube.name));
1156 for f in filter_fields { filter = filter.field(f); }
1157
1158 let orderby_input_name = format!("{}OrderByInput", cube.name);
1159 let orderby_input = InputObject::new(&orderby_input_name)
1160 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1161 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1162 .description("Sort descending by this field"))
1163 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1164 .description("Sort ascending by this field"))
1165 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1166 .description("Sort descending by computed/aggregated field name"))
1167 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1168 .description("Sort ascending by computed/aggregated field name"));
1169
1170 let limitby_input_name = format!("{}LimitByInput", cube.name);
1171 let limitby_input = InputObject::new(&limitby_input_name)
1172 .description(format!("Per-group row limit for {}", cube.name))
1173 .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1174 .description("Dimension field to group by"))
1175 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1176 .description("Maximum rows per group"))
1177 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1178 .description("Rows to skip per group"));
1179
1180 let mut objects = vec![record]; objects.extend(extra_objects);
1181 let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1182 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1183
1184 CubeTypes { objects, inputs, enums, unions: extra_unions }
1185}
1186
1187struct DimCollector<'a> {
1188 cube_name: &'a str,
1189 compare_enum_name: &'a str,
1190 filter_name: &'a str,
1191 record_fields: &'a mut Vec<Field>,
1192 filter_fields: &'a mut Vec<InputValue>,
1193 extra_objects: &'a mut Vec<Object>,
1194 extra_inputs: &'a mut Vec<InputObject>,
1195 extra_unions: &'a mut Vec<Union>,
1196}
1197
1198fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1199 match node {
1200 DimensionNode::Leaf(dim) => {
1201 let col = dim.column.clone();
1202 let is_datetime = dim.dim_type == DimType::DateTime;
1203 let compare_enum = c.compare_enum_name.to_string();
1204 let cube_filter = c.filter_name.to_string();
1205 let mut leaf_field = Field::new(
1206 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1207 move |ctx| {
1208 let col = col.clone();
1209 FieldFuture::new(async move {
1210 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1211 let has_interval = ctx.args.try_get("interval").is_ok();
1212 let has_max = ctx.args.try_get("maximum").is_ok();
1213 let has_min = ctx.args.try_get("minimum").is_ok();
1214 let key = if has_interval || has_max || has_min {
1215 let name = ctx.ctx.field().alias().unwrap_or(&col);
1216 dim_agg_key(name)
1217 } else {
1218 col.clone()
1219 };
1220 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1221 let gql_val = if is_datetime {
1222 json_to_gql_datetime(val)
1223 } else {
1224 json_to_gql_value(val)
1225 };
1226 Ok(Some(FieldValue::value(gql_val)))
1227 })
1228 },
1229 );
1230 if let Some(desc) = &dim.description {
1231 leaf_field = leaf_field.description(desc);
1232 }
1233 leaf_field = leaf_field
1234 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1235 .description("Return value from row where compare field is maximum (argMax)"))
1236 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1237 .description("Return value from row where compare field is minimum (argMin)"))
1238 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1239 .description("Conditional filter for aggregation"))
1240 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1241 .description("Post-aggregation value filter (HAVING)"));
1242 if is_datetime {
1243 leaf_field = leaf_field
1244 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1245 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1246 }
1247 c.record_fields.push(leaf_field);
1250 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1251 }
1252 DimensionNode::Group { graphql_name, description, children } => {
1253 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1254 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1255 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1256
1257 let mut child_record_fields: Vec<Field> = Vec::new();
1258 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1259 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1260
1261 let mut child_collector = DimCollector {
1262 cube_name: c.cube_name,
1263 compare_enum_name: c.compare_enum_name,
1264 filter_name: c.filter_name,
1265 record_fields: &mut child_record_fields,
1266 filter_fields: &mut child_filter_fields,
1267 extra_objects: c.extra_objects,
1268 extra_inputs: c.extra_inputs,
1269 extra_unions: c.extra_unions,
1270 };
1271 for child in children {
1272 collect_dimension_types(child, &new_prefix, &mut child_collector);
1273 }
1274
1275 let mut nested_record = Object::new(&nested_record_name);
1276 for f in child_record_fields { nested_record = nested_record.field(f); }
1277
1278 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1279 let mut nested_filter = InputObject::new(&nested_filter_name)
1280 .description(nested_filter_desc);
1281 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1282
1283 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1284 FieldFuture::new(async move {
1285 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1286 Ok(Some(FieldValue::owned_any(row.clone())))
1287 })
1288 });
1289 if let Some(desc) = description {
1290 group_field = group_field.description(desc);
1291 }
1292 c.record_fields.push(group_field);
1293 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1294 c.extra_objects.push(nested_record);
1295 c.extra_inputs.push(nested_filter);
1296 }
1297 DimensionNode::Array { graphql_name, description, children } => {
1298 let full_path = if prefix.is_empty() {
1299 graphql_name.clone()
1300 } else {
1301 format!("{prefix}_{graphql_name}")
1302 };
1303 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1304 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1305
1306 let mut element_obj = Object::new(&element_type_name);
1307 let mut includes_filter = InputObject::new(&includes_filter_name)
1308 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1309
1310 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1311
1312 for child in children {
1313 match &child.field_type {
1314 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1315 let col_name = child.column.clone();
1316 let is_datetime = *dt == DimType::DateTime;
1317 let mut field = Field::new(
1318 &child.graphql_name,
1319 dim_type_to_typeref(dt),
1320 move |ctx| {
1321 let col = col_name.clone();
1322 FieldFuture::new(async move {
1323 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1324 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1325 let gql_val = if is_datetime {
1326 json_to_gql_datetime(val)
1327 } else {
1328 json_to_gql_value(val)
1329 };
1330 Ok(Some(FieldValue::value(gql_val)))
1331 })
1332 },
1333 );
1334 if let Some(desc) = &child.description {
1335 field = field.description(desc);
1336 }
1337 element_obj = element_obj.field(field);
1338 includes_filter = includes_filter.field(
1339 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1340 );
1341 }
1342 crate::cube::definition::ArrayFieldType::Union(variants) => {
1343 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1344
1345 let mut gql_union = Union::new(&union_name);
1346 let mut variant_objects = Vec::new();
1347
1348 for v in variants {
1349 let variant_obj_name = v.type_name.clone();
1350 let field_name = v.field_name.clone();
1351 let source_type = v.source_type.clone();
1352 let type_ref = dim_type_to_typeref(&source_type);
1353
1354 let val_col = child.column.clone();
1355 let variant_obj = Object::new(&variant_obj_name)
1356 .field(Field::new(
1357 &field_name,
1358 type_ref,
1359 move |ctx| {
1360 let col = val_col.clone();
1361 FieldFuture::new(async move {
1362 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1363 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1364 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1365 })
1366 },
1367 ));
1368 gql_union = gql_union.possible_type(&variant_obj_name);
1369 variant_objects.push(variant_obj);
1370 }
1371
1372 let col_name = child.column.clone();
1373 let type_col = children.iter()
1374 .find(|f| f.graphql_name == "Type")
1375 .map(|f| f.column.clone())
1376 .unwrap_or_default();
1377 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1378
1379 let mut field = Field::new(
1380 &child.graphql_name,
1381 TypeRef::named(&union_name),
1382 move |ctx| {
1383 let col = col_name.clone();
1384 let tcol = type_col.clone();
1385 let vars = variants_clone.clone();
1386 FieldFuture::new(async move {
1387 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1388 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1389 if raw_val.is_null() || raw_val.as_str() == Some("") {
1390 return Ok(None);
1391 }
1392 let type_str = row.get(&tcol)
1393 .and_then(|v| v.as_str())
1394 .unwrap_or("");
1395 let resolved_type = resolve_union_typename(type_str, &vars);
1396 let mut elem = RowMap::new();
1397 elem.insert(col.clone(), raw_val);
1398 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1399 })
1400 },
1401 );
1402 if let Some(desc) = &child.description {
1403 field = field.description(desc);
1404 }
1405 element_obj = element_obj.field(field);
1406
1407 includes_filter = includes_filter.field(
1408 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1409 );
1410
1411 union_registrations.push((union_name, gql_union, variant_objects));
1412 }
1413 }
1414 }
1415
1416 for (_, union_type, variant_objs) in union_registrations {
1418 c.extra_objects.extend(variant_objs);
1419 c.extra_unions.push(union_type);
1427 }
1428
1429 let child_columns: Vec<(String, String)> = children.iter()
1431 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1432 .collect();
1433 let element_type_name_clone = element_type_name.clone();
1434
1435 let mut array_field = Field::new(
1436 graphql_name,
1437 TypeRef::named_nn_list_nn(&element_type_name),
1438 move |ctx| {
1439 let cols = child_columns.clone();
1440 let _etype = element_type_name_clone.clone();
1441 FieldFuture::new(async move {
1442 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1443 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1444 .map(|(gql_name, col)| {
1445 let arr = row.get(col)
1446 .and_then(|v| v.as_array())
1447 .cloned()
1448 .unwrap_or_default();
1449 (gql_name.as_str(), arr)
1450 })
1451 .collect();
1452
1453 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1454 let mut elements = Vec::with_capacity(len);
1455 for i in 0..len {
1456 let mut elem = RowMap::new();
1457 for (gql_name, arr) in &arrays {
1458 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1459 elem.insert(gql_name.to_string(), val);
1460 }
1461 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1463 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1464 elem.insert(col.clone(), val);
1465 }
1466 elements.push(FieldValue::owned_any(elem));
1467 }
1468 Ok(Some(FieldValue::list(elements)))
1469 })
1470 },
1471 );
1472 if let Some(desc) = description {
1473 array_field = array_field.description(desc);
1474 }
1475 c.record_fields.push(array_field);
1476
1477 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1481 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1482 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1483 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1484 .description("Match rows where at least one array element satisfies all conditions"));
1485 c.extra_inputs.push(wrapper_filter);
1486
1487 c.filter_fields.push(InputValue::new(
1488 graphql_name,
1489 TypeRef::named(&wrapper_filter_name),
1490 ));
1491
1492 c.extra_objects.push(element_obj);
1493 c.extra_inputs.push(includes_filter);
1494 }
1495 }
1496}
1497
1498fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1504 for v in variants {
1505 if v.source_type_names.iter().any(|s| s == type_str) {
1506 return v.type_name.clone();
1507 }
1508 }
1509 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1510}
1511
1512fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1513 match dt {
1514 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1515 DimType::Int => TypeRef::named(TypeRef::INT),
1516 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1517 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1518 }
1519}
1520
1521fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1522 match dt {
1523 DimType::String => "StringFilter",
1524 DimType::Int => "IntFilter",
1525 DimType::Float => "FloatFilter",
1526 DimType::DateTime => "DateTimeFilter",
1527 DimType::Bool => "BoolFilter",
1528 }
1529}
1530
1531pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1532 match v {
1533 serde_json::Value::Null => Value::Null,
1534 serde_json::Value::Bool(b) => Value::from(b),
1535 serde_json::Value::Number(n) => {
1536 if let Some(i) = n.as_i64() { Value::from(i) }
1537 else if let Some(f) = n.as_f64() { Value::from(f) }
1538 else { Value::from(n.to_string()) }
1539 }
1540 serde_json::Value::String(s) => Value::from(s),
1541 _ => Value::from(v.to_string()),
1542 }
1543}
1544
1545fn build_join_expr(
1548 jd: &crate::cube::definition::JoinDef,
1549 target_cube: &CubeDefinition,
1550 sub_field: &async_graphql::SelectionField<'_>,
1551 network: &str,
1552 join_idx: usize,
1553) -> JoinExpr {
1554 let target_flat = target_cube.flat_dimensions();
1555 let target_table = target_cube.table_for_chain(network);
1556
1557 let mut requested_paths = HashSet::new();
1558 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1559
1560 let mut selects: Vec<SelectExpr> = target_flat.iter()
1561 .filter(|(path, _)| requested_paths.contains(path))
1562 .map(|(_, dim)| SelectExpr::Column {
1563 column: dim.column.clone(),
1564 alias: None,
1565 })
1566 .collect();
1567
1568 if selects.is_empty() {
1569 selects = target_flat.iter()
1570 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1571 .collect();
1572 }
1573
1574 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1575
1576 let group_by = if is_aggregate {
1577 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1578 for sel in &selects {
1579 if let SelectExpr::Column { column, .. } = sel {
1580 if !column.contains('(') && !gb.contains(column) {
1581 gb.push(column.clone());
1582 }
1583 }
1584 }
1585 gb
1586 } else {
1587 vec![]
1588 };
1589
1590 JoinExpr {
1591 schema: target_cube.schema.clone(),
1592 table: target_table,
1593 alias: format!("_j{}", join_idx),
1594 conditions: jd.conditions.clone(),
1595 selects,
1596 group_by,
1597 use_final: target_cube.use_final,
1598 is_aggregate,
1599 target_cube: jd.target_cube.clone(),
1600 join_field: sub_field.name().to_string(),
1601 join_type: jd.join_type.clone(),
1602 }
1603}
1604
1605fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1608 match v {
1609 serde_json::Value::String(s) => {
1610 let iso = if s.contains('T') {
1611 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1612 } else {
1613 let replaced = s.replacen(' ', "T", 1);
1614 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1615 };
1616 Value::from(iso)
1617 }
1618 other => json_to_gql_value(other),
1619 }
1620}