1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59}
60
61pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66pub struct SchemaConfig {
68 pub chain_groups: Vec<ChainGroupConfig>,
69 pub root_query_name: String,
70 pub stats_callback: Option<StatsCallback>,
73 pub table_name_transform: Option<TableNameTransform>,
75 pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80 fn default() -> Self {
81 Self {
82 chain_groups: vec![
83 ChainGroupConfig {
84 name: "EVM".to_string(),
85 group: ChainGroup::Evm,
86 networks: vec!["eth".into(), "bsc".into()],
87 has_network_arg: true,
88 extra_args: vec![],
89 },
90 ChainGroupConfig {
91 name: "Solana".to_string(),
92 group: ChainGroup::Solana,
93 networks: vec!["sol".into()],
94 has_network_arg: false,
95 extra_args: vec![],
96 },
97 ChainGroupConfig {
98 name: "Trading".to_string(),
99 group: ChainGroup::Trading,
100 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101 has_network_arg: false,
102 extra_args: vec![],
103 },
104 ],
105 root_query_name: "ChainStream".to_string(),
106 stats_callback: None,
107 table_name_transform: None,
108 extra_types: vec![],
109 }
110 }
111}
112
113pub struct ChainContext {
117 pub network: String,
118 pub extra: HashMap<String, String>,
119}
120
121pub fn build_schema(
133 registry: CubeRegistry,
134 dialect: Arc<dyn SqlDialect>,
135 executor: QueryExecutor,
136 config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138 let mut builder = Schema::build("Query", None, None);
139
140 builder = builder.register(filter_types::build_limit_input());
142 builder = builder.register(
143 InputObject::new("LimitByInput")
144 .description("Limit results per group (similar to ClickHouse LIMIT BY)")
145 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
146 .description("Comma-separated dimension names to group by"))
147 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
148 .description("Maximum rows per group"))
149 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
150 .description("Rows to skip per group")),
151 );
152 builder = builder.register(
153 Enum::new("OrderDirection")
154 .description("Sort direction")
155 .item(EnumItem::new("ASC").description("Ascending"))
156 .item(EnumItem::new("DESC").description("Descending")),
157 );
158 for input in filter_types::build_filter_primitives() {
159 builder = builder.register(input);
160 }
161 builder = builder.register(
162 Enum::new("TimeUnit")
163 .description("Time unit for interval bucketing")
164 .item(EnumItem::new("seconds"))
165 .item(EnumItem::new("minutes"))
166 .item(EnumItem::new("hours"))
167 .item(EnumItem::new("days"))
168 .item(EnumItem::new("weeks"))
169 .item(EnumItem::new("months")),
170 );
171 builder = builder.register(
172 InputObject::new("TimeIntervalInput")
173 .description("Time bucketing interval for DateTime dimensions")
174 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
175 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
176 );
177 builder = builder.register(
178 InputObject::new("DimSelectWhere")
179 .description("Post-aggregation filter for dimension values (HAVING clause)")
180 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
181 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
182 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
183 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
184 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
185 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
186 );
187
188 for extra_enum in config.extra_types {
189 builder = builder.register(extra_enum);
190 }
191
192 for grp in &config.chain_groups {
194 if grp.has_network_arg {
195 let enum_name = format!("{}Network", grp.name);
196 let mut net_enum = Enum::new(&enum_name)
197 .description(format!("{} network selector", grp.name));
198 for net in &grp.networks {
199 net_enum = net_enum.item(EnumItem::new(net));
200 }
201 builder = builder.register(net_enum);
202 }
203 }
204
205 let mut registered_cubes: HashSet<String> = HashSet::new();
207 for cube in registry.cubes() {
208 if registered_cubes.contains(&cube.name) { continue; }
209 registered_cubes.insert(cube.name.clone());
210
211 let types = build_cube_types(cube);
212 for obj in types.objects { builder = builder.register(obj); }
213 for inp in types.inputs { builder = builder.register(inp); }
214 for en in types.enums { builder = builder.register(en); }
215 for un in types.unions { builder = builder.register(un); }
216 }
217
218 let mut query = Object::new("Query");
220
221 for grp in &config.chain_groups {
222 let wrapper_type_name = grp.name.clone();
223
224 let mut wrapper_obj = Object::new(&wrapper_type_name);
227
228 for cube in registry.cubes() {
229 if !cube.chain_groups.contains(&grp.group) { continue; }
230
231 let cube_field = build_cube_field(
232 cube,
233 dialect.clone(),
234 executor.clone(),
235 config.stats_callback.clone(),
236 );
237 wrapper_obj = wrapper_obj.field(cube_field);
238 }
239 builder = builder.register(wrapper_obj);
240
241 let default_network = grp.networks.first().cloned().unwrap_or_default();
243 let has_net_arg = grp.has_network_arg;
244 let net_enum_name = format!("{}Network", grp.name);
245 let wrapper_type_for_resolver = wrapper_type_name.clone();
246 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
247
248 let mut wrapper_field = Field::new(
249 &wrapper_type_name,
250 TypeRef::named_nn(&wrapper_type_name),
251 move |ctx| {
252 let default = default_network.clone();
253 let has_arg = has_net_arg;
254 let arg_names = extra_arg_names.clone();
255 FieldFuture::new(async move {
256 let network = if has_arg {
257 ctx.args.try_get("network")
258 .ok()
259 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
260 .unwrap_or(default)
261 } else {
262 default
263 };
264 let mut extra = HashMap::new();
265 for arg_name in &arg_names {
266 if let Ok(val) = ctx.args.try_get(arg_name) {
267 let resolved = val.enum_name().ok().map(|s| s.to_string())
268 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
269 .or_else(|| val.string().ok().map(|s| s.to_string()));
270 if let Some(v) = resolved {
271 extra.insert(arg_name.clone(), v);
272 }
273 }
274 }
275 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
276 })
277 },
278 );
279
280 if grp.has_network_arg {
281 wrapper_field = wrapper_field.argument(
282 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
283 .description(format!("{} network to query", wrapper_type_for_resolver)),
284 );
285 }
286 for wa in &grp.extra_args {
287 wrapper_field = wrapper_field.argument(
288 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
289 .description(&wa.description),
290 );
291 }
292
293 query = query.field(wrapper_field);
294 }
295
296 let metadata_registry = Arc::new(registry.clone());
298 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
299 .map(|g| (g.name.clone(), g.group.clone()))
300 .collect();
301 let metadata_field = Field::new(
302 "_cubeMetadata",
303 TypeRef::named_nn(TypeRef::STRING),
304 move |_ctx| {
305 let reg = metadata_registry.clone();
306 let groups = metadata_groups.clone();
307 FieldFuture::new(async move {
308 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
309 for (group_name, group_enum) in &groups {
310 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
311 .filter(|c| c.chain_groups.contains(group_enum))
312 .map(serialize_cube_metadata)
313 .collect();
314 group_metadata.push(serde_json::json!({
315 "group": group_name,
316 "cubes": cubes_in_group,
317 }));
318 }
319 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
320 Ok(Some(FieldValue::value(Value::from(json))))
321 })
322 },
323 )
324 .description("Internal: returns JSON metadata about all cubes grouped by chain");
325 query = query.field(metadata_field);
326
327 builder = builder.register(query);
328 builder = builder.data(registry);
329
330 if let Some(transform) = config.table_name_transform.clone() {
331 builder = builder.data(transform);
332 }
333
334 builder.finish()
335}
336
337fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
338 serde_json::json!({
339 "name": cube.name,
340 "description": cube.description,
341 "schema": cube.schema,
342 "tablePattern": cube.table_pattern,
343 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
344 "metrics": cube.metrics.iter().map(|m| {
345 let mut obj = serde_json::json!({
346 "name": m.name,
347 "returnType": format!("{:?}", m.return_type),
348 });
349 if let Some(ref tmpl) = m.expression_template {
350 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
351 }
352 if let Some(ref desc) = m.description {
353 obj["description"] = serde_json::Value::String(desc.clone());
354 }
355 obj
356 }).collect::<Vec<_>>(),
357 "selectors": cube.selectors.iter().map(|s| {
358 serde_json::json!({
359 "name": s.graphql_name,
360 "column": s.column,
361 "type": format!("{:?}", s.dim_type),
362 })
363 }).collect::<Vec<_>>(),
364 "dimensions": serialize_dims(&cube.dimensions),
365 "joins": cube.joins.iter().map(|j| {
366 serde_json::json!({
367 "field": j.field_name,
368 "target": j.target_cube,
369 "joinType": format!("{:?}", j.join_type),
370 })
371 }).collect::<Vec<_>>(),
372 "tableRoutes": cube.table_routes.iter().map(|r| {
373 serde_json::json!({
374 "schema": r.schema,
375 "tablePattern": r.table_pattern,
376 "availableColumns": r.available_columns,
377 "priority": r.priority,
378 })
379 }).collect::<Vec<_>>(),
380 "defaultLimit": cube.default_limit,
381 "maxLimit": cube.max_limit,
382 })
383}
384
385fn build_cube_field(
387 cube: &CubeDefinition,
388 dialect: Arc<dyn SqlDialect>,
389 executor: QueryExecutor,
390 stats_cb: Option<StatsCallback>,
391) -> Field {
392 let cube_name = cube.name.clone();
393 let orderby_input_name = format!("{}OrderByInput", cube.name);
394 let cube_description = cube.description.clone();
395
396 let mut field = Field::new(
397 &cube.name,
398 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
399 move |ctx| {
400 let cube_name = cube_name.clone();
401 let dialect = dialect.clone();
402 let executor = executor.clone();
403 let stats_cb = stats_cb.clone();
404 FieldFuture::new(async move {
405 let registry = ctx.ctx.data::<CubeRegistry>()?;
406
407 let chain_ctx = ctx.parent_value
408 .try_downcast_ref::<ChainContext>().ok();
409 let network = chain_ctx
410 .map(|c| c.network.as_str())
411 .unwrap_or("sol");
412
413 let cube_def = registry.get(&cube_name).ok_or_else(|| {
414 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
415 })?;
416
417 let metric_requests = extract_metric_requests(&ctx, cube_def);
418 let quantile_requests = extract_quantile_requests(&ctx);
419 let calculate_requests = extract_calculate_requests(&ctx);
420 let field_aliases = extract_field_aliases(&ctx, cube_def);
421 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
422 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
423 let requested = extract_requested_fields(&ctx, cube_def);
424 let mut ir = compiler::parser::parse_cube_query(
425 cube_def,
426 network,
427 &ctx.args,
428 &metric_requests,
429 &quantile_requests,
430 &calculate_requests,
431 &field_aliases,
432 &dim_agg_requests,
433 &time_intervals,
434 Some(requested),
435 )?;
436
437 let mut join_idx = 0usize;
438 for sub_field in ctx.ctx.field().selection_set() {
439 let fname = sub_field.name().to_string();
440 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
441 if let Some(jd) = join_def {
442 if let Some(target_cube) = registry.get(&jd.target_cube) {
443 let join_expr = build_join_expr(
444 jd, target_cube, &sub_field, network, join_idx,
445 );
446 ir.joins.push(join_expr);
447 join_idx += 1;
448 }
449 }
450 }
451
452 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
453 if let Some(cctx) = chain_ctx {
454 ir.table = transform(&ir.table, cctx);
455 }
456 }
457
458 let validated = compiler::validator::validate(ir)?;
459 let result = dialect.compile(&validated);
460 let sql = result.sql;
461 let bindings = result.bindings;
462
463 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
464 async_graphql::Error::new(format!("Query execution failed: {e}"))
465 })?;
466
467 let rows = if result.alias_remap.is_empty() {
468 rows
469 } else {
470 rows.into_iter().map(|mut row| {
471 for (alias, original) in &result.alias_remap {
472 if let Some(val) = row.shift_remove(alias) {
473 row.entry(original.clone()).or_insert(val);
474 }
475 }
476 row
477 }).collect()
478 };
479
480 let rows: Vec<RowMap> = if validated.joins.is_empty() {
481 rows
482 } else {
483 rows.into_iter().map(|mut row| {
484 for join in &validated.joins {
485 let prefix = format!("{}.", join.alias);
486 let mut sub_row = RowMap::new();
487 let keys: Vec<String> = row.keys()
488 .filter(|k| k.starts_with(&prefix))
489 .cloned()
490 .collect();
491 for key in keys {
492 if let Some(val) = row.shift_remove(&key) {
493 sub_row.insert(key[prefix.len()..].to_string(), val);
494 }
495 }
496 let obj: serde_json::Map<String, serde_json::Value> =
497 sub_row.into_iter().collect();
498 row.insert(
499 join.join_field.clone(),
500 serde_json::Value::Object(obj),
501 );
502 }
503 row
504 }).collect()
505 };
506
507 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
508 .or_else(|| stats_cb.clone());
509 if let Some(cb) = effective_cb {
510 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
511 cb(stats);
512 }
513
514 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
515 Ok(Some(FieldValue::list(values)))
516 })
517 },
518 );
519 if !cube_description.is_empty() {
520 field = field.description(&cube_description);
521 }
522 field = field
523 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
524 .description("Filter conditions"))
525 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
526 .description("Pagination control"))
527 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
528 .description("Per-group row limit"))
529 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
530 .description("Sort order (Bitquery-compatible)"));
531
532 for sel in &cube.selectors {
533 let filter_type = dim_type_to_filter_name(&sel.dim_type);
534 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
535 .description(format!("Shorthand filter for {}", sel.graphql_name)));
536 }
537
538 field
539}
540
541fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
542 serde_json::Value::Array(dims.iter().map(|d| match d {
543 DimensionNode::Leaf(dim) => {
544 let mut obj = serde_json::json!({
545 "name": dim.graphql_name,
546 "column": dim.column,
547 "type": format!("{:?}", dim.dim_type),
548 });
549 if let Some(desc) = &dim.description {
550 obj["description"] = serde_json::Value::String(desc.clone());
551 }
552 obj
553 },
554 DimensionNode::Group { graphql_name, description, children } => {
555 let mut obj = serde_json::json!({
556 "name": graphql_name,
557 "children": serialize_dims(children),
558 });
559 if let Some(desc) = description {
560 obj["description"] = serde_json::Value::String(desc.clone());
561 }
562 obj
563 },
564 DimensionNode::Array { graphql_name, description, children } => {
565 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
566 let type_value = match &f.field_type {
567 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
568 "kind": "scalar",
569 "scalarType": format!("{:?}", dt),
570 }),
571 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
572 "kind": "union",
573 "variants": variants.iter().map(|v| serde_json::json!({
574 "typeName": v.type_name,
575 "fieldName": v.field_name,
576 "sourceType": format!("{:?}", v.source_type),
577 })).collect::<Vec<_>>(),
578 }),
579 };
580 let mut field_obj = serde_json::json!({
581 "name": f.graphql_name,
582 "column": f.column,
583 "type": type_value,
584 });
585 if let Some(desc) = &f.description {
586 field_obj["description"] = serde_json::Value::String(desc.clone());
587 }
588 field_obj
589 }).collect();
590 let mut obj = serde_json::json!({
591 "name": graphql_name,
592 "kind": "array",
593 "fields": fields,
594 });
595 if let Some(desc) = description {
596 obj["description"] = serde_json::Value::String(desc.clone());
597 }
598 obj
599 },
600 }).collect())
601}
602
603fn extract_metric_requests(
607 ctx: &async_graphql::dynamic::ResolverContext,
608 cube: &CubeDefinition,
609) -> Vec<compiler::parser::MetricRequest> {
610 let mut requests = Vec::new();
611
612 for sub_field in ctx.ctx.field().selection_set() {
613 let name = sub_field.name();
614 if !cube.has_metric(name) {
615 continue;
616 }
617
618 let args = match sub_field.arguments() {
619 Ok(args) => args,
620 Err(_) => continue,
621 };
622
623 let of_dimension = args
624 .iter()
625 .find(|(k, _)| k.as_str() == "of")
626 .and_then(|(_, v)| match v {
627 async_graphql::Value::Enum(e) => Some(e.to_string()),
628 async_graphql::Value::String(s) => Some(s.clone()),
629 _ => None,
630 })
631 .unwrap_or_else(|| "*".to_string());
632
633 let select_where_value = args
634 .iter()
635 .find(|(k, _)| k.as_str() == "selectWhere")
636 .map(|(_, v)| v.clone());
637
638 let condition_filter = args
639 .iter()
640 .find(|(k, _)| k.as_str() == "if")
641 .and_then(|(_, v)| {
642 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
643 .and_then(|f| if f.is_empty() { None } else { Some(f) })
644 });
645
646 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
647 requests.push(compiler::parser::MetricRequest {
648 function: name.to_string(),
649 alias: gql_alias,
650 of_dimension,
651 select_where_value,
652 condition_filter,
653 });
654 }
655
656 requests
657}
658
659fn extract_requested_fields(
660 ctx: &async_graphql::dynamic::ResolverContext,
661 cube: &CubeDefinition,
662) -> HashSet<String> {
663 let mut fields = HashSet::new();
664 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
665 fields
666}
667
668fn collect_selection_paths(
669 field: &async_graphql::SelectionField<'_>,
670 prefix: &str,
671 out: &mut HashSet<String>,
672 metrics: &[MetricDef],
673) {
674 for sub in field.selection_set() {
675 let name = sub.name();
676 if metrics.iter().any(|m| m.name == name) {
677 continue;
678 }
679 let path = if prefix.is_empty() {
680 name.to_string()
681 } else {
682 format!("{prefix}_{name}")
683 };
684 let has_children = sub.selection_set().next().is_some();
685 if has_children {
686 collect_selection_paths(&sub, &path, out, metrics);
687 } else {
688 out.insert(path);
689 }
690 }
691}
692
693pub type FieldAliasMap = Vec<(String, String)>;
696
697pub struct QuantileRequest {
699 pub alias: String,
700 pub of_dimension: String,
701 pub level: f64,
702}
703
704pub struct CalculateRequest {
706 pub alias: String,
707 pub expression: String,
708}
709
710pub struct TimeIntervalRequest {
713 pub field_path: String,
714 pub graphql_alias: String,
715 pub column: String,
716 pub unit: String,
717 pub count: i64,
718}
719
720pub struct DimAggRequest {
723 pub field_path: String,
724 pub graphql_alias: String,
725 pub value_column: String,
726 pub agg_type: DimAggType,
727 pub compare_column: String,
728 pub condition_filter: Option<FilterNode>,
729 pub select_where_value: Option<async_graphql::Value>,
730}
731
732fn extract_quantile_requests(
733 ctx: &async_graphql::dynamic::ResolverContext,
734) -> Vec<QuantileRequest> {
735 let mut requests = Vec::new();
736 for sub_field in ctx.ctx.field().selection_set() {
737 if sub_field.name() != "quantile" { continue; }
738 let args = match sub_field.arguments() {
739 Ok(a) => a,
740 Err(_) => continue,
741 };
742 let of_dim = args.iter()
743 .find(|(k, _)| k.as_str() == "of")
744 .and_then(|(_, v)| match v {
745 async_graphql::Value::Enum(e) => Some(e.to_string()),
746 async_graphql::Value::String(s) => Some(s.clone()),
747 _ => None,
748 })
749 .unwrap_or_else(|| "*".to_string());
750 let level = args.iter()
751 .find(|(k, _)| k.as_str() == "level")
752 .and_then(|(_, v)| match v {
753 async_graphql::Value::Number(n) => n.as_f64(),
754 _ => None,
755 })
756 .unwrap_or(0.5);
757 let alias = sub_field.alias().unwrap_or("quantile").to_string();
758 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
759 }
760 requests
761}
762
763fn extract_field_aliases(
764 ctx: &async_graphql::dynamic::ResolverContext,
765 cube: &CubeDefinition,
766) -> FieldAliasMap {
767 let flat = cube.flat_dimensions();
768 let mut aliases = Vec::new();
769 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
770 aliases
771}
772
773fn collect_field_aliases(
774 field: &async_graphql::SelectionField<'_>,
775 prefix: &str,
776 flat: &[(String, crate::cube::definition::Dimension)],
777 metrics: &[MetricDef],
778 out: &mut FieldAliasMap,
779) {
780 for sub in field.selection_set() {
781 let name = sub.name();
782 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
783 continue;
784 }
785 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
786 let has_children = sub.selection_set().next().is_some();
787 if has_children {
788 collect_field_aliases(&sub, &path, flat, metrics, out);
789 } else if let Some(alias) = sub.alias() {
790 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
791 let alias_path = if prefix.is_empty() {
792 alias.to_string()
793 } else {
794 format!("{prefix}_{alias}")
795 };
796 out.push((alias_path, dim.column.clone()));
797 }
798 }
799 }
800}
801
802fn extract_calculate_requests(
803 ctx: &async_graphql::dynamic::ResolverContext,
804) -> Vec<CalculateRequest> {
805 let mut requests = Vec::new();
806 for sub_field in ctx.ctx.field().selection_set() {
807 if sub_field.name() != "calculate" { continue; }
808 let args = match sub_field.arguments() {
809 Ok(a) => a,
810 Err(_) => continue,
811 };
812 let expression = args.iter()
813 .find(|(k, _)| k.as_str() == "expression")
814 .and_then(|(_, v)| match v {
815 async_graphql::Value::String(s) => Some(s.clone()),
816 _ => None,
817 });
818 if let Some(expr) = expression {
819 let alias = sub_field.alias().unwrap_or("calculate").to_string();
820 requests.push(CalculateRequest { alias, expression: expr });
821 }
822 }
823 requests
824}
825
826fn extract_dim_agg_requests(
827 ctx: &async_graphql::dynamic::ResolverContext,
828 cube: &CubeDefinition,
829) -> Vec<DimAggRequest> {
830 let flat = cube.flat_dimensions();
831 let mut requests = Vec::new();
832 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
833 requests
834}
835
836fn collect_dim_agg_paths(
837 field: &async_graphql::SelectionField<'_>,
838 prefix: &str,
839 flat: &[(String, crate::cube::definition::Dimension)],
840 metrics: &[MetricDef],
841 cube: &CubeDefinition,
842 out: &mut Vec<DimAggRequest>,
843) {
844 for sub in field.selection_set() {
845 let name = sub.name();
846 if metrics.iter().any(|m| m.name == name) {
847 continue;
848 }
849 let path = if prefix.is_empty() {
850 name.to_string()
851 } else {
852 format!("{prefix}_{name}")
853 };
854 let has_children = sub.selection_set().next().is_some();
855 if has_children {
856 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
857 } else {
858 let args = match sub.arguments() {
859 Ok(a) => a,
860 Err(_) => continue,
861 };
862 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
863 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
864
865 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
866 let cp = match v {
867 async_graphql::Value::Enum(e) => e.to_string(),
868 async_graphql::Value::String(s) => s.clone(),
869 _ => continue,
870 };
871 (DimAggType::ArgMax, cp)
872 } else if let Some((_, v)) = min_val {
873 let cp = match v {
874 async_graphql::Value::Enum(e) => e.to_string(),
875 async_graphql::Value::String(s) => s.clone(),
876 _ => continue,
877 };
878 (DimAggType::ArgMin, cp)
879 } else {
880 continue;
881 };
882
883 let value_column = flat.iter()
884 .find(|(p, _)| p == &path)
885 .map(|(_, dim)| dim.column.clone());
886 let compare_column = flat.iter()
887 .find(|(p, _)| p == &compare_path)
888 .map(|(_, dim)| dim.column.clone());
889
890 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
891 let condition_filter = args.iter()
892 .find(|(k, _)| k.as_str() == "if")
893 .and_then(|(_, v)| {
894 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
895 .and_then(|f| if f.is_empty() { None } else { Some(f) })
896 });
897 let select_where_value = args.iter()
898 .find(|(k, _)| k.as_str() == "selectWhere")
899 .map(|(_, v)| v.clone());
900
901 let gql_alias = sub.alias()
902 .map(|a| a.to_string())
903 .unwrap_or_else(|| path.clone());
904 out.push(DimAggRequest {
905 field_path: path,
906 graphql_alias: gql_alias,
907 value_column: vc,
908 agg_type,
909 compare_column: cc,
910 condition_filter,
911 select_where_value,
912 });
913 }
914 }
915 }
916}
917
918fn extract_time_interval_requests(
919 ctx: &async_graphql::dynamic::ResolverContext,
920 cube: &CubeDefinition,
921) -> Vec<TimeIntervalRequest> {
922 let flat = cube.flat_dimensions();
923 let mut requests = Vec::new();
924 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
925 requests
926}
927
928fn collect_time_interval_paths(
929 field: &async_graphql::SelectionField<'_>,
930 prefix: &str,
931 flat: &[(String, crate::cube::definition::Dimension)],
932 metrics: &[MetricDef],
933 out: &mut Vec<TimeIntervalRequest>,
934) {
935 for sub in field.selection_set() {
936 let name = sub.name();
937 if metrics.iter().any(|m| m.name == name) { continue; }
938 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
939 let has_children = sub.selection_set().next().is_some();
940 if has_children {
941 collect_time_interval_paths(&sub, &path, flat, metrics, out);
942 } else {
943 let args = match sub.arguments() {
944 Ok(a) => a,
945 Err(_) => continue,
946 };
947 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
948 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
949 let unit = obj.get("in")
950 .and_then(|v| match v {
951 async_graphql::Value::Enum(e) => Some(e.to_string()),
952 async_graphql::Value::String(s) => Some(s.clone()),
953 _ => None,
954 });
955 let count = obj.get("count")
956 .and_then(|v| match v {
957 async_graphql::Value::Number(n) => n.as_i64(),
958 _ => None,
959 });
960 if let (Some(unit), Some(count)) = (unit, count) {
961 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
962 let gql_alias = sub.alias().unwrap_or(name).to_string();
963 out.push(TimeIntervalRequest {
964 field_path: path,
965 graphql_alias: gql_alias,
966 column: dim.column.clone(),
967 unit,
968 count,
969 });
970 }
971 }
972 }
973 }
974 }
975}
976
977struct CubeTypes {
982 objects: Vec<Object>,
983 inputs: Vec<InputObject>,
984 enums: Vec<Enum>,
985 unions: Vec<Union>,
986}
987
988fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
989 let record_name = format!("{}Record", cube.name);
990 let filter_name = format!("{}Filter", cube.name);
991 let compare_enum_name = format!("{}CompareFields", cube.name);
992
993 let flat_dims = cube.flat_dimensions();
994
995 let mut compare_enum = Enum::new(&compare_enum_name)
996 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
997 for (path, _) in &flat_dims {
998 compare_enum = compare_enum.item(EnumItem::new(path));
999 }
1000
1001 let mut record_fields: Vec<Field> = Vec::new();
1002 let mut filter_fields: Vec<InputValue> = Vec::new();
1003 let mut extra_objects: Vec<Object> = Vec::new();
1004 let mut extra_inputs: Vec<InputObject> = Vec::new();
1005 let mut extra_unions: Vec<Union> = Vec::new();
1006
1007 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1008 .description("OR combinator — matches if any sub-filter matches"));
1009
1010 {
1011 let mut collector = DimCollector {
1012 cube_name: &cube.name,
1013 compare_enum_name: &compare_enum_name,
1014 filter_name: &filter_name,
1015 record_fields: &mut record_fields,
1016 filter_fields: &mut filter_fields,
1017 extra_objects: &mut extra_objects,
1018 extra_inputs: &mut extra_inputs,
1019 extra_unions: &mut extra_unions,
1020 };
1021 for node in &cube.dimensions {
1022 collect_dimension_types(node, "", &mut collector);
1023 }
1024 }
1025
1026 let mut metric_enums: Vec<Enum> = Vec::new();
1027 let builtin_descs: std::collections::HashMap<&str, &str> = [
1028 ("count", "Count of rows or distinct values"),
1029 ("sum", "Sum of values"),
1030 ("avg", "Average of values"),
1031 ("min", "Minimum value"),
1032 ("max", "Maximum value"),
1033 ("uniq", "Count of unique (distinct) values"),
1034 ].into_iter().collect();
1035
1036 for metric in &cube.metrics {
1037 let metric_name = &metric.name;
1038 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1039
1040 if metric.supports_where {
1041 extra_inputs.push(
1042 InputObject::new(&select_where_name)
1043 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1044 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1045 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1046 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1047 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1048 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1049 );
1050 }
1051
1052 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1053 let mut of_enum = Enum::new(&of_enum_name)
1054 .description(format!("Dimension to apply {} aggregation on", metric_name));
1055 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1056 metric_enums.push(of_enum);
1057
1058 let metric_name_clone = metric_name.clone();
1059 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1060 let metric_desc = metric.description.as_deref()
1061 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1062 .unwrap_or("Aggregate metric");
1063
1064 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1065 let default_name = metric_name_clone.clone();
1066 FieldFuture::new(async move {
1067 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1068 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1069 let key = metric_key(alias);
1070 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1071 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1072 })
1073 })
1074 .description(metric_desc)
1075 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1076 .description("Dimension to aggregate on (default: all rows)"));
1077
1078 if metric.supports_where {
1079 metric_field = metric_field
1080 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1081 .description("Post-aggregation filter (HAVING)"))
1082 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1083 .description("Conditional filter for this metric"));
1084 }
1085
1086 record_fields.push(metric_field);
1087 }
1088
1089 {
1091 let of_enum_name = format!("{}_quantile_Of", cube.name);
1092 let mut of_enum = Enum::new(&of_enum_name)
1093 .description(format!("Dimension to apply quantile on for {}", cube.name));
1094 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1095 metric_enums.push(of_enum);
1096
1097 let of_enum_for_closure = of_enum_name.clone();
1098 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1099 FieldFuture::new(async move {
1100 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1101 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1102 let key = metric_key(alias);
1103 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1104 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1105 })
1106 })
1107 .description("Compute a quantile (percentile) of a dimension")
1108 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1109 .description("Dimension to compute quantile on"))
1110 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1111 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1112 record_fields.push(quantile_field);
1113 }
1114
1115 {
1117 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1118 FieldFuture::new(async move {
1119 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1120 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1121 let key = metric_key(alias);
1122 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1123 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1124 })
1125 })
1126 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1127 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1128 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1129 record_fields.push(calculate_field);
1130 }
1131
1132 for jd in &cube.joins {
1134 let target_record_name = format!("{}Record", jd.target_cube);
1135 let field_name_owned = jd.field_name.clone();
1136 let mut join_field = Field::new(
1137 &jd.field_name,
1138 TypeRef::named(&target_record_name),
1139 move |ctx| {
1140 let field_name = field_name_owned.clone();
1141 FieldFuture::new(async move {
1142 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1143 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1144 let sub_row: RowMap = obj.iter()
1145 .map(|(k, v)| (k.clone(), v.clone()))
1146 .collect();
1147 Ok(Some(FieldValue::owned_any(sub_row)))
1148 } else {
1149 Ok(Some(FieldValue::value(Value::Null)))
1150 }
1151 })
1152 },
1153 );
1154 if let Some(desc) = &jd.description {
1155 join_field = join_field.description(desc);
1156 }
1157 record_fields.push(join_field);
1158 }
1159
1160 let mut record = Object::new(&record_name);
1161 for f in record_fields { record = record.field(f); }
1162
1163 let mut filter = InputObject::new(&filter_name)
1164 .description(format!("Filter conditions for {} query", cube.name));
1165 for f in filter_fields { filter = filter.field(f); }
1166
1167 let orderby_input_name = format!("{}OrderByInput", cube.name);
1168 let orderby_input = InputObject::new(&orderby_input_name)
1169 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1170 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1171 .description("Sort descending by this field"))
1172 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1173 .description("Sort ascending by this field"))
1174 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1175 .description("Sort descending by computed/aggregated field name"))
1176 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1177 .description("Sort ascending by computed/aggregated field name"));
1178
1179 let mut objects = vec![record]; objects.extend(extra_objects);
1180 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1181 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1182
1183 CubeTypes { objects, inputs, enums, unions: extra_unions }
1184}
1185
1186struct DimCollector<'a> {
1187 cube_name: &'a str,
1188 compare_enum_name: &'a str,
1189 filter_name: &'a str,
1190 record_fields: &'a mut Vec<Field>,
1191 filter_fields: &'a mut Vec<InputValue>,
1192 extra_objects: &'a mut Vec<Object>,
1193 extra_inputs: &'a mut Vec<InputObject>,
1194 extra_unions: &'a mut Vec<Union>,
1195}
1196
1197fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1198 match node {
1199 DimensionNode::Leaf(dim) => {
1200 let col = dim.column.clone();
1201 let is_datetime = dim.dim_type == DimType::DateTime;
1202 let compare_enum = c.compare_enum_name.to_string();
1203 let cube_filter = c.filter_name.to_string();
1204 let mut leaf_field = Field::new(
1205 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1206 move |ctx| {
1207 let col = col.clone();
1208 FieldFuture::new(async move {
1209 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1210 let has_interval = ctx.args.try_get("interval").is_ok();
1211 let has_max = ctx.args.try_get("maximum").is_ok();
1212 let has_min = ctx.args.try_get("minimum").is_ok();
1213 let key = if has_interval || has_max || has_min {
1214 let name = ctx.ctx.field().alias().unwrap_or(&col);
1215 dim_agg_key(name)
1216 } else {
1217 col.clone()
1218 };
1219 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1220 let gql_val = if is_datetime {
1221 json_to_gql_datetime(val)
1222 } else {
1223 json_to_gql_value(val)
1224 };
1225 Ok(Some(FieldValue::value(gql_val)))
1226 })
1227 },
1228 );
1229 if let Some(desc) = &dim.description {
1230 leaf_field = leaf_field.description(desc);
1231 }
1232 leaf_field = leaf_field
1233 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1234 .description("Return value from row where compare field is maximum (argMax)"))
1235 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1236 .description("Return value from row where compare field is minimum (argMin)"))
1237 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1238 .description("Conditional filter for aggregation"))
1239 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1240 .description("Post-aggregation value filter (HAVING)"));
1241 if is_datetime {
1242 leaf_field = leaf_field
1243 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1244 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1245 }
1246 c.record_fields.push(leaf_field);
1249 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1250 }
1251 DimensionNode::Group { graphql_name, description, children } => {
1252 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1253 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1254 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1255
1256 let mut child_record_fields: Vec<Field> = Vec::new();
1257 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1258 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1259
1260 let mut child_collector = DimCollector {
1261 cube_name: c.cube_name,
1262 compare_enum_name: c.compare_enum_name,
1263 filter_name: c.filter_name,
1264 record_fields: &mut child_record_fields,
1265 filter_fields: &mut child_filter_fields,
1266 extra_objects: c.extra_objects,
1267 extra_inputs: c.extra_inputs,
1268 extra_unions: c.extra_unions,
1269 };
1270 for child in children {
1271 collect_dimension_types(child, &new_prefix, &mut child_collector);
1272 }
1273
1274 let mut nested_record = Object::new(&nested_record_name);
1275 for f in child_record_fields { nested_record = nested_record.field(f); }
1276
1277 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1278 let mut nested_filter = InputObject::new(&nested_filter_name)
1279 .description(nested_filter_desc);
1280 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1281
1282 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1283 FieldFuture::new(async move {
1284 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1285 Ok(Some(FieldValue::owned_any(row.clone())))
1286 })
1287 });
1288 if let Some(desc) = description {
1289 group_field = group_field.description(desc);
1290 }
1291 c.record_fields.push(group_field);
1292 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1293 c.extra_objects.push(nested_record);
1294 c.extra_inputs.push(nested_filter);
1295 }
1296 DimensionNode::Array { graphql_name, description, children } => {
1297 let full_path = if prefix.is_empty() {
1298 graphql_name.clone()
1299 } else {
1300 format!("{prefix}_{graphql_name}")
1301 };
1302 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1303 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1304
1305 let mut element_obj = Object::new(&element_type_name);
1306 let mut includes_filter = InputObject::new(&includes_filter_name)
1307 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1308
1309 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1310
1311 for child in children {
1312 match &child.field_type {
1313 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1314 let col_name = child.column.clone();
1315 let is_datetime = *dt == DimType::DateTime;
1316 let mut field = Field::new(
1317 &child.graphql_name,
1318 dim_type_to_typeref(dt),
1319 move |ctx| {
1320 let col = col_name.clone();
1321 FieldFuture::new(async move {
1322 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1323 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1324 let gql_val = if is_datetime {
1325 json_to_gql_datetime(val)
1326 } else {
1327 json_to_gql_value(val)
1328 };
1329 Ok(Some(FieldValue::value(gql_val)))
1330 })
1331 },
1332 );
1333 if let Some(desc) = &child.description {
1334 field = field.description(desc);
1335 }
1336 element_obj = element_obj.field(field);
1337 includes_filter = includes_filter.field(
1338 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1339 );
1340 }
1341 crate::cube::definition::ArrayFieldType::Union(variants) => {
1342 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1343
1344 let mut gql_union = Union::new(&union_name);
1345 let mut variant_objects = Vec::new();
1346
1347 for v in variants {
1348 let variant_obj_name = v.type_name.clone();
1349 let field_name = v.field_name.clone();
1350 let source_type = v.source_type.clone();
1351 let type_ref = dim_type_to_typeref(&source_type);
1352
1353 let val_col = child.column.clone();
1354 let variant_obj = Object::new(&variant_obj_name)
1355 .field(Field::new(
1356 &field_name,
1357 type_ref,
1358 move |ctx| {
1359 let col = val_col.clone();
1360 FieldFuture::new(async move {
1361 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1362 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1363 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1364 })
1365 },
1366 ));
1367 gql_union = gql_union.possible_type(&variant_obj_name);
1368 variant_objects.push(variant_obj);
1369 }
1370
1371 let col_name = child.column.clone();
1372 let type_col = children.iter()
1373 .find(|f| f.graphql_name == "Type")
1374 .map(|f| f.column.clone())
1375 .unwrap_or_default();
1376 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1377
1378 let mut field = Field::new(
1379 &child.graphql_name,
1380 TypeRef::named(&union_name),
1381 move |ctx| {
1382 let col = col_name.clone();
1383 let tcol = type_col.clone();
1384 let vars = variants_clone.clone();
1385 FieldFuture::new(async move {
1386 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1387 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1388 if raw_val.is_null() || raw_val.as_str() == Some("") {
1389 return Ok(None);
1390 }
1391 let type_str = row.get(&tcol)
1392 .and_then(|v| v.as_str())
1393 .unwrap_or("");
1394 let resolved_type = resolve_union_typename(type_str, &vars);
1395 let mut elem = RowMap::new();
1396 elem.insert(col.clone(), raw_val);
1397 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1398 })
1399 },
1400 );
1401 if let Some(desc) = &child.description {
1402 field = field.description(desc);
1403 }
1404 element_obj = element_obj.field(field);
1405
1406 includes_filter = includes_filter.field(
1407 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1408 );
1409
1410 union_registrations.push((union_name, gql_union, variant_objects));
1411 }
1412 }
1413 }
1414
1415 for (_, union_type, variant_objs) in union_registrations {
1417 c.extra_objects.extend(variant_objs);
1418 c.extra_unions.push(union_type);
1426 }
1427
1428 let child_columns: Vec<(String, String)> = children.iter()
1430 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1431 .collect();
1432 let element_type_name_clone = element_type_name.clone();
1433
1434 let mut array_field = Field::new(
1435 graphql_name,
1436 TypeRef::named_nn_list_nn(&element_type_name),
1437 move |ctx| {
1438 let cols = child_columns.clone();
1439 let _etype = element_type_name_clone.clone();
1440 FieldFuture::new(async move {
1441 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1442 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1443 .map(|(gql_name, col)| {
1444 let arr = row.get(col)
1445 .and_then(|v| v.as_array())
1446 .cloned()
1447 .unwrap_or_default();
1448 (gql_name.as_str(), arr)
1449 })
1450 .collect();
1451
1452 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1453 let mut elements = Vec::with_capacity(len);
1454 for i in 0..len {
1455 let mut elem = RowMap::new();
1456 for (gql_name, arr) in &arrays {
1457 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1458 elem.insert(gql_name.to_string(), val);
1459 }
1460 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1462 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1463 elem.insert(col.clone(), val);
1464 }
1465 elements.push(FieldValue::owned_any(elem));
1466 }
1467 Ok(Some(FieldValue::list(elements)))
1468 })
1469 },
1470 );
1471 if let Some(desc) = description {
1472 array_field = array_field.description(desc);
1473 }
1474 c.record_fields.push(array_field);
1475
1476 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1480 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1481 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1482 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1483 .description("Match rows where at least one array element satisfies all conditions"));
1484 c.extra_inputs.push(wrapper_filter);
1485
1486 c.filter_fields.push(InputValue::new(
1487 graphql_name,
1488 TypeRef::named(&wrapper_filter_name),
1489 ));
1490
1491 c.extra_objects.push(element_obj);
1492 c.extra_inputs.push(includes_filter);
1493 }
1494 }
1495}
1496
1497fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1503 for v in variants {
1504 if v.source_type_names.iter().any(|s| s == type_str) {
1505 return v.type_name.clone();
1506 }
1507 }
1508 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1509}
1510
1511fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1512 match dt {
1513 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1514 DimType::Int => TypeRef::named(TypeRef::INT),
1515 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1516 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1517 }
1518}
1519
1520fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1521 match dt {
1522 DimType::String => "StringFilter",
1523 DimType::Int => "IntFilter",
1524 DimType::Float => "FloatFilter",
1525 DimType::DateTime => "DateTimeFilter",
1526 DimType::Bool => "BoolFilter",
1527 }
1528}
1529
1530pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1531 match v {
1532 serde_json::Value::Null => Value::Null,
1533 serde_json::Value::Bool(b) => Value::from(b),
1534 serde_json::Value::Number(n) => {
1535 if let Some(i) = n.as_i64() { Value::from(i) }
1536 else if let Some(f) = n.as_f64() { Value::from(f) }
1537 else { Value::from(n.to_string()) }
1538 }
1539 serde_json::Value::String(s) => Value::from(s),
1540 _ => Value::from(v.to_string()),
1541 }
1542}
1543
1544fn build_join_expr(
1547 jd: &crate::cube::definition::JoinDef,
1548 target_cube: &CubeDefinition,
1549 sub_field: &async_graphql::SelectionField<'_>,
1550 network: &str,
1551 join_idx: usize,
1552) -> JoinExpr {
1553 let target_flat = target_cube.flat_dimensions();
1554 let target_table = target_cube.table_for_chain(network);
1555
1556 let mut requested_paths = HashSet::new();
1557 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1558
1559 let mut selects: Vec<SelectExpr> = target_flat.iter()
1560 .filter(|(path, _)| requested_paths.contains(path))
1561 .map(|(_, dim)| SelectExpr::Column {
1562 column: dim.column.clone(),
1563 alias: None,
1564 })
1565 .collect();
1566
1567 if selects.is_empty() {
1568 selects = target_flat.iter()
1569 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1570 .collect();
1571 }
1572
1573 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1574
1575 let group_by = if is_aggregate {
1576 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1577 for sel in &selects {
1578 if let SelectExpr::Column { column, .. } = sel {
1579 if !column.contains('(') && !gb.contains(column) {
1580 gb.push(column.clone());
1581 }
1582 }
1583 }
1584 gb
1585 } else {
1586 vec![]
1587 };
1588
1589 JoinExpr {
1590 schema: target_cube.schema.clone(),
1591 table: target_table,
1592 alias: format!("_j{}", join_idx),
1593 conditions: jd.conditions.clone(),
1594 selects,
1595 group_by,
1596 use_final: target_cube.use_final,
1597 is_aggregate,
1598 target_cube: jd.target_cube.clone(),
1599 join_field: sub_field.name().to_string(),
1600 join_type: jd.join_type.clone(),
1601 }
1602}
1603
1604fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1607 match v {
1608 serde_json::Value::String(s) => {
1609 let iso = if s.contains('T') {
1610 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1611 } else {
1612 let replaced = s.replacen(' ', "T", 1);
1613 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1614 };
1615 Value::from(iso)
1616 }
1617 other => json_to_gql_value(other),
1618 }
1619}