1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59}
60
61pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66pub struct SchemaConfig {
68 pub chain_groups: Vec<ChainGroupConfig>,
69 pub root_query_name: String,
70 pub stats_callback: Option<StatsCallback>,
73 pub table_name_transform: Option<TableNameTransform>,
75 pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80 fn default() -> Self {
81 Self {
82 chain_groups: vec![
83 ChainGroupConfig {
84 name: "EVM".to_string(),
85 group: ChainGroup::Evm,
86 networks: vec!["eth".into(), "bsc".into()],
87 has_network_arg: true,
88 extra_args: vec![],
89 },
90 ChainGroupConfig {
91 name: "Solana".to_string(),
92 group: ChainGroup::Solana,
93 networks: vec!["sol".into()],
94 has_network_arg: false,
95 extra_args: vec![],
96 },
97 ChainGroupConfig {
98 name: "Trading".to_string(),
99 group: ChainGroup::Trading,
100 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101 has_network_arg: false,
102 extra_args: vec![],
103 },
104 ],
105 root_query_name: "ChainStream".to_string(),
106 stats_callback: None,
107 table_name_transform: None,
108 extra_types: vec![],
109 }
110 }
111}
112
113pub struct ChainContext {
117 pub network: String,
118 pub extra: HashMap<String, String>,
119}
120
121pub fn build_schema(
133 registry: CubeRegistry,
134 dialect: Arc<dyn SqlDialect>,
135 executor: QueryExecutor,
136 config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138 let mut builder = Schema::build("Query", None, None);
139
140 builder = builder.register(filter_types::build_limit_input());
142 builder = builder.register(
143 InputObject::new("LimitByInput")
144 .description("Limit results per group (similar to ClickHouse LIMIT BY)")
145 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
146 .description("Comma-separated dimension names to group by"))
147 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
148 .description("Maximum rows per group"))
149 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
150 .description("Rows to skip per group")),
151 );
152 builder = builder.register(
153 Enum::new("OrderDirection")
154 .description("Sort direction")
155 .item(EnumItem::new("ASC").description("Ascending"))
156 .item(EnumItem::new("DESC").description("Descending")),
157 );
158 for input in filter_types::build_filter_primitives() {
159 builder = builder.register(input);
160 }
161 builder = builder.register(
162 Enum::new("TimeUnit")
163 .description("Time unit for interval bucketing")
164 .item(EnumItem::new("seconds"))
165 .item(EnumItem::new("minutes"))
166 .item(EnumItem::new("hours"))
167 .item(EnumItem::new("days"))
168 .item(EnumItem::new("weeks"))
169 .item(EnumItem::new("months")),
170 );
171 builder = builder.register(
172 InputObject::new("TimeIntervalInput")
173 .description("Time bucketing interval for DateTime dimensions")
174 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
175 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
176 );
177 builder = builder.register(
178 InputObject::new("DimSelectWhere")
179 .description("Post-aggregation filter for dimension values (HAVING clause)")
180 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
181 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
182 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
183 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
184 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
185 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
186 );
187
188 for extra_enum in config.extra_types {
189 builder = builder.register(extra_enum);
190 }
191
192 for grp in &config.chain_groups {
194 if grp.has_network_arg {
195 let enum_name = format!("{}Network", grp.name);
196 let mut net_enum = Enum::new(&enum_name)
197 .description(format!("{} network selector", grp.name));
198 for net in &grp.networks {
199 net_enum = net_enum.item(EnumItem::new(net));
200 }
201 builder = builder.register(net_enum);
202 }
203 }
204
205 let mut registered_cubes: HashSet<String> = HashSet::new();
207 for cube in registry.cubes() {
208 if registered_cubes.contains(&cube.name) { continue; }
209 registered_cubes.insert(cube.name.clone());
210
211 let types = build_cube_types(cube);
212 for obj in types.objects { builder = builder.register(obj); }
213 for inp in types.inputs { builder = builder.register(inp); }
214 for en in types.enums { builder = builder.register(en); }
215 for un in types.unions { builder = builder.register(un); }
216 }
217
218 let mut query = Object::new("Query");
220
221 for grp in &config.chain_groups {
222 let wrapper_type_name = grp.name.clone();
223
224 let mut wrapper_obj = Object::new(&wrapper_type_name);
227
228 for cube in registry.cubes() {
229 if !cube.chain_groups.contains(&grp.group) { continue; }
230
231 let cube_field = build_cube_field(
232 cube,
233 dialect.clone(),
234 executor.clone(),
235 config.stats_callback.clone(),
236 );
237 wrapper_obj = wrapper_obj.field(cube_field);
238 }
239 builder = builder.register(wrapper_obj);
240
241 let default_network = grp.networks.first().cloned().unwrap_or_default();
243 let has_net_arg = grp.has_network_arg;
244 let net_enum_name = format!("{}Network", grp.name);
245 let wrapper_type_for_resolver = wrapper_type_name.clone();
246 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
247
248 let mut wrapper_field = Field::new(
249 &wrapper_type_name,
250 TypeRef::named_nn(&wrapper_type_name),
251 move |ctx| {
252 let default = default_network.clone();
253 let has_arg = has_net_arg;
254 let arg_names = extra_arg_names.clone();
255 FieldFuture::new(async move {
256 let network = if has_arg {
257 ctx.args.try_get("network")
258 .ok()
259 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
260 .unwrap_or(default)
261 } else {
262 default
263 };
264 let mut extra = HashMap::new();
265 for arg_name in &arg_names {
266 if let Ok(val) = ctx.args.try_get(arg_name) {
267 let resolved = val.enum_name().ok().map(|s| s.to_string())
268 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
269 .or_else(|| val.string().ok().map(|s| s.to_string()));
270 if let Some(v) = resolved {
271 extra.insert(arg_name.clone(), v);
272 }
273 }
274 }
275 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
276 })
277 },
278 );
279
280 if grp.has_network_arg {
281 wrapper_field = wrapper_field.argument(
282 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
283 .description(format!("{} network to query", wrapper_type_for_resolver)),
284 );
285 }
286 for wa in &grp.extra_args {
287 wrapper_field = wrapper_field.argument(
288 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
289 .description(&wa.description),
290 );
291 }
292
293 query = query.field(wrapper_field);
294 }
295
296 let metadata_registry = Arc::new(registry.clone());
298 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
299 .map(|g| (g.name.clone(), g.group.clone()))
300 .collect();
301 let metadata_field = Field::new(
302 "_cubeMetadata",
303 TypeRef::named_nn(TypeRef::STRING),
304 move |_ctx| {
305 let reg = metadata_registry.clone();
306 let groups = metadata_groups.clone();
307 FieldFuture::new(async move {
308 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
309 for (group_name, group_enum) in &groups {
310 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
311 .filter(|c| c.chain_groups.contains(group_enum))
312 .map(serialize_cube_metadata)
313 .collect();
314 group_metadata.push(serde_json::json!({
315 "group": group_name,
316 "cubes": cubes_in_group,
317 }));
318 }
319 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
320 Ok(Some(FieldValue::value(Value::from(json))))
321 })
322 },
323 )
324 .description("Internal: returns JSON metadata about all cubes grouped by chain");
325 query = query.field(metadata_field);
326
327 builder = builder.register(query);
328 builder = builder.data(registry);
329
330 if let Some(transform) = config.table_name_transform.clone() {
331 builder = builder.data(transform);
332 }
333
334 builder.finish()
335}
336
337fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
338 serde_json::json!({
339 "name": cube.name,
340 "description": cube.description,
341 "schema": cube.schema,
342 "tablePattern": cube.table_pattern,
343 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
344 "metrics": cube.metrics.iter().map(|m| {
345 let mut obj = serde_json::json!({
346 "name": m.name,
347 "returnType": format!("{:?}", m.return_type),
348 });
349 if let Some(ref tmpl) = m.expression_template {
350 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
351 }
352 if let Some(ref desc) = m.description {
353 obj["description"] = serde_json::Value::String(desc.clone());
354 }
355 obj
356 }).collect::<Vec<_>>(),
357 "selectors": cube.selectors.iter().map(|s| {
358 serde_json::json!({
359 "name": s.graphql_name,
360 "column": s.column,
361 "type": format!("{:?}", s.dim_type),
362 })
363 }).collect::<Vec<_>>(),
364 "dimensions": serialize_dims(&cube.dimensions),
365 "joins": cube.joins.iter().map(|j| {
366 serde_json::json!({
367 "field": j.field_name,
368 "target": j.target_cube,
369 "joinType": format!("{:?}", j.join_type),
370 })
371 }).collect::<Vec<_>>(),
372 "tableRoutes": cube.table_routes.iter().map(|r| {
373 serde_json::json!({
374 "schema": r.schema,
375 "tablePattern": r.table_pattern,
376 "availableColumns": r.available_columns,
377 "priority": r.priority,
378 })
379 }).collect::<Vec<_>>(),
380 "defaultLimit": cube.default_limit,
381 "maxLimit": cube.max_limit,
382 })
383}
384
385fn build_cube_field(
387 cube: &CubeDefinition,
388 dialect: Arc<dyn SqlDialect>,
389 executor: QueryExecutor,
390 stats_cb: Option<StatsCallback>,
391) -> Field {
392 let cube_name = cube.name.clone();
393 let orderby_input_name = format!("{}OrderByInput", cube.name);
394 let cube_description = cube.description.clone();
395
396 let mut field = Field::new(
397 &cube.name,
398 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
399 move |ctx| {
400 let cube_name = cube_name.clone();
401 let dialect = dialect.clone();
402 let executor = executor.clone();
403 let stats_cb = stats_cb.clone();
404 FieldFuture::new(async move {
405 let registry = ctx.ctx.data::<CubeRegistry>()?;
406
407 let chain_ctx = ctx.parent_value
408 .try_downcast_ref::<ChainContext>().ok();
409 let network = chain_ctx
410 .map(|c| c.network.as_str())
411 .unwrap_or("sol");
412
413 let cube_def = registry.get(&cube_name).ok_or_else(|| {
414 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
415 })?;
416
417 let metric_requests = extract_metric_requests(&ctx, cube_def);
418 let quantile_requests = extract_quantile_requests(&ctx);
419 let calculate_requests = extract_calculate_requests(&ctx);
420 let field_aliases = extract_field_aliases(&ctx, cube_def);
421 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
422 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
423 let requested = extract_requested_fields(&ctx, cube_def);
424 let mut ir = compiler::parser::parse_cube_query(
425 cube_def,
426 network,
427 &ctx.args,
428 &metric_requests,
429 &quantile_requests,
430 &calculate_requests,
431 &field_aliases,
432 &dim_agg_requests,
433 &time_intervals,
434 Some(requested),
435 )?;
436
437 let mut join_idx = 0usize;
438 for sub_field in ctx.ctx.field().selection_set() {
439 let fname = sub_field.name().to_string();
440 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
441 if let Some(jd) = join_def {
442 if let Some(target_cube) = registry.get(&jd.target_cube) {
443 let join_expr = build_join_expr(
444 jd, target_cube, &sub_field, network, join_idx,
445 );
446 ir.joins.push(join_expr);
447 join_idx += 1;
448 }
449 }
450 }
451
452 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
453 if let Some(cctx) = chain_ctx {
454 ir.table = transform(&ir.table, cctx);
455 }
456 }
457
458 let validated = compiler::validator::validate(ir)?;
459 let result = dialect.compile(&validated);
460 let sql = result.sql;
461 let bindings = result.bindings;
462
463 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
464 async_graphql::Error::new(format!("Query execution failed: {e}"))
465 })?;
466
467 let rows = if result.alias_remap.is_empty() {
468 rows
469 } else {
470 rows.into_iter().map(|mut row| {
471 for (alias, original) in &result.alias_remap {
472 if let Some(val) = row.shift_remove(alias) {
473 row.entry(original.clone()).or_insert(val);
474 }
475 }
476 row
477 }).collect()
478 };
479
480 let rows: Vec<RowMap> = if validated.joins.is_empty() {
481 rows
482 } else {
483 rows.into_iter().map(|mut row| {
484 for join in &validated.joins {
485 let prefix = format!("{}.", join.alias);
486 let mut sub_row = RowMap::new();
487 let keys: Vec<String> = row.keys()
488 .filter(|k| k.starts_with(&prefix))
489 .cloned()
490 .collect();
491 for key in keys {
492 if let Some(val) = row.shift_remove(&key) {
493 sub_row.insert(key[prefix.len()..].to_string(), val);
494 }
495 }
496 let obj: serde_json::Map<String, serde_json::Value> =
497 sub_row.into_iter().collect();
498 row.insert(
499 join.join_field.clone(),
500 serde_json::Value::Object(obj),
501 );
502 }
503 row
504 }).collect()
505 };
506
507 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
508 .or_else(|| stats_cb.clone());
509 if let Some(cb) = effective_cb {
510 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
511 cb(stats);
512 }
513
514 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
515 Ok(Some(FieldValue::list(values)))
516 })
517 },
518 );
519 if !cube_description.is_empty() {
520 field = field.description(&cube_description);
521 }
522 field = field
523 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
524 .description("Filter conditions"))
525 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
526 .description("Pagination control"))
527 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
528 .description("Per-group row limit"))
529 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
530 .description("Sort order (Bitquery-compatible)"));
531
532 for sel in &cube.selectors {
533 let filter_type = dim_type_to_filter_name(&sel.dim_type);
534 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
535 .description(format!("Shorthand filter for {}", sel.graphql_name)));
536 }
537
538 field
539}
540
541fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
542 serde_json::Value::Array(dims.iter().map(|d| match d {
543 DimensionNode::Leaf(dim) => {
544 let mut obj = serde_json::json!({
545 "name": dim.graphql_name,
546 "column": dim.column,
547 "type": format!("{:?}", dim.dim_type),
548 });
549 if let Some(desc) = &dim.description {
550 obj["description"] = serde_json::Value::String(desc.clone());
551 }
552 obj
553 },
554 DimensionNode::Group { graphql_name, description, children } => {
555 let mut obj = serde_json::json!({
556 "name": graphql_name,
557 "children": serialize_dims(children),
558 });
559 if let Some(desc) = description {
560 obj["description"] = serde_json::Value::String(desc.clone());
561 }
562 obj
563 },
564 DimensionNode::Array { graphql_name, description, children } => {
565 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
566 serde_json::json!({
567 "name": f.graphql_name,
568 "column": f.column,
569 "type": format!("{:?}", f.field_type),
570 })
571 }).collect();
572 let mut obj = serde_json::json!({
573 "name": graphql_name,
574 "kind": "array",
575 "fields": fields,
576 });
577 if let Some(desc) = description {
578 obj["description"] = serde_json::Value::String(desc.clone());
579 }
580 obj
581 },
582 }).collect())
583}
584
585fn extract_metric_requests(
589 ctx: &async_graphql::dynamic::ResolverContext,
590 cube: &CubeDefinition,
591) -> Vec<compiler::parser::MetricRequest> {
592 let mut requests = Vec::new();
593
594 for sub_field in ctx.ctx.field().selection_set() {
595 let name = sub_field.name();
596 if !cube.has_metric(name) {
597 continue;
598 }
599
600 let args = match sub_field.arguments() {
601 Ok(args) => args,
602 Err(_) => continue,
603 };
604
605 let of_dimension = args
606 .iter()
607 .find(|(k, _)| k.as_str() == "of")
608 .and_then(|(_, v)| match v {
609 async_graphql::Value::Enum(e) => Some(e.to_string()),
610 async_graphql::Value::String(s) => Some(s.clone()),
611 _ => None,
612 })
613 .unwrap_or_else(|| "*".to_string());
614
615 let select_where_value = args
616 .iter()
617 .find(|(k, _)| k.as_str() == "selectWhere")
618 .map(|(_, v)| v.clone());
619
620 let condition_filter = args
621 .iter()
622 .find(|(k, _)| k.as_str() == "if")
623 .and_then(|(_, v)| {
624 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
625 .and_then(|f| if f.is_empty() { None } else { Some(f) })
626 });
627
628 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
629 requests.push(compiler::parser::MetricRequest {
630 function: name.to_string(),
631 alias: gql_alias,
632 of_dimension,
633 select_where_value,
634 condition_filter,
635 });
636 }
637
638 requests
639}
640
641fn extract_requested_fields(
642 ctx: &async_graphql::dynamic::ResolverContext,
643 cube: &CubeDefinition,
644) -> HashSet<String> {
645 let mut fields = HashSet::new();
646 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
647 fields
648}
649
650fn collect_selection_paths(
651 field: &async_graphql::SelectionField<'_>,
652 prefix: &str,
653 out: &mut HashSet<String>,
654 metrics: &[MetricDef],
655) {
656 for sub in field.selection_set() {
657 let name = sub.name();
658 if metrics.iter().any(|m| m.name == name) {
659 continue;
660 }
661 let path = if prefix.is_empty() {
662 name.to_string()
663 } else {
664 format!("{prefix}_{name}")
665 };
666 let has_children = sub.selection_set().next().is_some();
667 if has_children {
668 collect_selection_paths(&sub, &path, out, metrics);
669 } else {
670 out.insert(path);
671 }
672 }
673}
674
675pub type FieldAliasMap = Vec<(String, String)>;
678
679pub struct QuantileRequest {
681 pub alias: String,
682 pub of_dimension: String,
683 pub level: f64,
684}
685
686pub struct CalculateRequest {
688 pub alias: String,
689 pub expression: String,
690}
691
692pub struct TimeIntervalRequest {
695 pub field_path: String,
696 pub graphql_alias: String,
697 pub column: String,
698 pub unit: String,
699 pub count: i64,
700}
701
702pub struct DimAggRequest {
705 pub field_path: String,
706 pub graphql_alias: String,
707 pub value_column: String,
708 pub agg_type: DimAggType,
709 pub compare_column: String,
710 pub condition_filter: Option<FilterNode>,
711 pub select_where_value: Option<async_graphql::Value>,
712}
713
714fn extract_quantile_requests(
715 ctx: &async_graphql::dynamic::ResolverContext,
716) -> Vec<QuantileRequest> {
717 let mut requests = Vec::new();
718 for sub_field in ctx.ctx.field().selection_set() {
719 if sub_field.name() != "quantile" { continue; }
720 let args = match sub_field.arguments() {
721 Ok(a) => a,
722 Err(_) => continue,
723 };
724 let of_dim = args.iter()
725 .find(|(k, _)| k.as_str() == "of")
726 .and_then(|(_, v)| match v {
727 async_graphql::Value::Enum(e) => Some(e.to_string()),
728 async_graphql::Value::String(s) => Some(s.clone()),
729 _ => None,
730 })
731 .unwrap_or_else(|| "*".to_string());
732 let level = args.iter()
733 .find(|(k, _)| k.as_str() == "level")
734 .and_then(|(_, v)| match v {
735 async_graphql::Value::Number(n) => n.as_f64(),
736 _ => None,
737 })
738 .unwrap_or(0.5);
739 let alias = sub_field.alias().unwrap_or("quantile").to_string();
740 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
741 }
742 requests
743}
744
745fn extract_field_aliases(
746 ctx: &async_graphql::dynamic::ResolverContext,
747 cube: &CubeDefinition,
748) -> FieldAliasMap {
749 let flat = cube.flat_dimensions();
750 let mut aliases = Vec::new();
751 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
752 aliases
753}
754
755fn collect_field_aliases(
756 field: &async_graphql::SelectionField<'_>,
757 prefix: &str,
758 flat: &[(String, crate::cube::definition::Dimension)],
759 metrics: &[MetricDef],
760 out: &mut FieldAliasMap,
761) {
762 for sub in field.selection_set() {
763 let name = sub.name();
764 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
765 continue;
766 }
767 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
768 let has_children = sub.selection_set().next().is_some();
769 if has_children {
770 collect_field_aliases(&sub, &path, flat, metrics, out);
771 } else if let Some(alias) = sub.alias() {
772 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
773 let alias_path = if prefix.is_empty() {
774 alias.to_string()
775 } else {
776 format!("{prefix}_{alias}")
777 };
778 out.push((alias_path, dim.column.clone()));
779 }
780 }
781 }
782}
783
784fn extract_calculate_requests(
785 ctx: &async_graphql::dynamic::ResolverContext,
786) -> Vec<CalculateRequest> {
787 let mut requests = Vec::new();
788 for sub_field in ctx.ctx.field().selection_set() {
789 if sub_field.name() != "calculate" { continue; }
790 let args = match sub_field.arguments() {
791 Ok(a) => a,
792 Err(_) => continue,
793 };
794 let expression = args.iter()
795 .find(|(k, _)| k.as_str() == "expression")
796 .and_then(|(_, v)| match v {
797 async_graphql::Value::String(s) => Some(s.clone()),
798 _ => None,
799 });
800 if let Some(expr) = expression {
801 let alias = sub_field.alias().unwrap_or("calculate").to_string();
802 requests.push(CalculateRequest { alias, expression: expr });
803 }
804 }
805 requests
806}
807
808fn extract_dim_agg_requests(
809 ctx: &async_graphql::dynamic::ResolverContext,
810 cube: &CubeDefinition,
811) -> Vec<DimAggRequest> {
812 let flat = cube.flat_dimensions();
813 let mut requests = Vec::new();
814 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
815 requests
816}
817
818fn collect_dim_agg_paths(
819 field: &async_graphql::SelectionField<'_>,
820 prefix: &str,
821 flat: &[(String, crate::cube::definition::Dimension)],
822 metrics: &[MetricDef],
823 cube: &CubeDefinition,
824 out: &mut Vec<DimAggRequest>,
825) {
826 for sub in field.selection_set() {
827 let name = sub.name();
828 if metrics.iter().any(|m| m.name == name) {
829 continue;
830 }
831 let path = if prefix.is_empty() {
832 name.to_string()
833 } else {
834 format!("{prefix}_{name}")
835 };
836 let has_children = sub.selection_set().next().is_some();
837 if has_children {
838 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
839 } else {
840 let args = match sub.arguments() {
841 Ok(a) => a,
842 Err(_) => continue,
843 };
844 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
845 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
846
847 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
848 let cp = match v {
849 async_graphql::Value::Enum(e) => e.to_string(),
850 async_graphql::Value::String(s) => s.clone(),
851 _ => continue,
852 };
853 (DimAggType::ArgMax, cp)
854 } else if let Some((_, v)) = min_val {
855 let cp = match v {
856 async_graphql::Value::Enum(e) => e.to_string(),
857 async_graphql::Value::String(s) => s.clone(),
858 _ => continue,
859 };
860 (DimAggType::ArgMin, cp)
861 } else {
862 continue;
863 };
864
865 let value_column = flat.iter()
866 .find(|(p, _)| p == &path)
867 .map(|(_, dim)| dim.column.clone());
868 let compare_column = flat.iter()
869 .find(|(p, _)| p == &compare_path)
870 .map(|(_, dim)| dim.column.clone());
871
872 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
873 let condition_filter = args.iter()
874 .find(|(k, _)| k.as_str() == "if")
875 .and_then(|(_, v)| {
876 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
877 .and_then(|f| if f.is_empty() { None } else { Some(f) })
878 });
879 let select_where_value = args.iter()
880 .find(|(k, _)| k.as_str() == "selectWhere")
881 .map(|(_, v)| v.clone());
882
883 let gql_alias = sub.alias()
884 .map(|a| a.to_string())
885 .unwrap_or_else(|| path.clone());
886 out.push(DimAggRequest {
887 field_path: path,
888 graphql_alias: gql_alias,
889 value_column: vc,
890 agg_type,
891 compare_column: cc,
892 condition_filter,
893 select_where_value,
894 });
895 }
896 }
897 }
898}
899
900fn extract_time_interval_requests(
901 ctx: &async_graphql::dynamic::ResolverContext,
902 cube: &CubeDefinition,
903) -> Vec<TimeIntervalRequest> {
904 let flat = cube.flat_dimensions();
905 let mut requests = Vec::new();
906 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
907 requests
908}
909
910fn collect_time_interval_paths(
911 field: &async_graphql::SelectionField<'_>,
912 prefix: &str,
913 flat: &[(String, crate::cube::definition::Dimension)],
914 metrics: &[MetricDef],
915 out: &mut Vec<TimeIntervalRequest>,
916) {
917 for sub in field.selection_set() {
918 let name = sub.name();
919 if metrics.iter().any(|m| m.name == name) { continue; }
920 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
921 let has_children = sub.selection_set().next().is_some();
922 if has_children {
923 collect_time_interval_paths(&sub, &path, flat, metrics, out);
924 } else {
925 let args = match sub.arguments() {
926 Ok(a) => a,
927 Err(_) => continue,
928 };
929 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
930 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
931 let unit = obj.get("in")
932 .and_then(|v| match v {
933 async_graphql::Value::Enum(e) => Some(e.to_string()),
934 async_graphql::Value::String(s) => Some(s.clone()),
935 _ => None,
936 });
937 let count = obj.get("count")
938 .and_then(|v| match v {
939 async_graphql::Value::Number(n) => n.as_i64(),
940 _ => None,
941 });
942 if let (Some(unit), Some(count)) = (unit, count) {
943 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
944 let gql_alias = sub.alias().unwrap_or(name).to_string();
945 out.push(TimeIntervalRequest {
946 field_path: path,
947 graphql_alias: gql_alias,
948 column: dim.column.clone(),
949 unit,
950 count,
951 });
952 }
953 }
954 }
955 }
956 }
957}
958
959struct CubeTypes {
964 objects: Vec<Object>,
965 inputs: Vec<InputObject>,
966 enums: Vec<Enum>,
967 unions: Vec<Union>,
968}
969
970fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
971 let record_name = format!("{}Record", cube.name);
972 let filter_name = format!("{}Filter", cube.name);
973 let compare_enum_name = format!("{}CompareFields", cube.name);
974
975 let flat_dims = cube.flat_dimensions();
976
977 let mut compare_enum = Enum::new(&compare_enum_name)
978 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
979 for (path, _) in &flat_dims {
980 compare_enum = compare_enum.item(EnumItem::new(path));
981 }
982
983 let mut record_fields: Vec<Field> = Vec::new();
984 let mut filter_fields: Vec<InputValue> = Vec::new();
985 let mut extra_objects: Vec<Object> = Vec::new();
986 let mut extra_inputs: Vec<InputObject> = Vec::new();
987 let mut extra_unions: Vec<Union> = Vec::new();
988
989 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
990 .description("OR combinator — matches if any sub-filter matches"));
991
992 {
993 let mut collector = DimCollector {
994 cube_name: &cube.name,
995 compare_enum_name: &compare_enum_name,
996 filter_name: &filter_name,
997 record_fields: &mut record_fields,
998 filter_fields: &mut filter_fields,
999 extra_objects: &mut extra_objects,
1000 extra_inputs: &mut extra_inputs,
1001 extra_unions: &mut extra_unions,
1002 };
1003 for node in &cube.dimensions {
1004 collect_dimension_types(node, "", &mut collector);
1005 }
1006 }
1007
1008 let mut metric_enums: Vec<Enum> = Vec::new();
1009 let builtin_descs: std::collections::HashMap<&str, &str> = [
1010 ("count", "Count of rows or distinct values"),
1011 ("sum", "Sum of values"),
1012 ("avg", "Average of values"),
1013 ("min", "Minimum value"),
1014 ("max", "Maximum value"),
1015 ("uniq", "Count of unique (distinct) values"),
1016 ].into_iter().collect();
1017
1018 for metric in &cube.metrics {
1019 let metric_name = &metric.name;
1020 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1021
1022 if metric.supports_where {
1023 extra_inputs.push(
1024 InputObject::new(&select_where_name)
1025 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1026 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1027 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1028 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1029 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1030 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1031 );
1032 }
1033
1034 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1035 let mut of_enum = Enum::new(&of_enum_name)
1036 .description(format!("Dimension to apply {} aggregation on", metric_name));
1037 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1038 metric_enums.push(of_enum);
1039
1040 let metric_name_clone = metric_name.clone();
1041 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1042 let metric_desc = metric.description.as_deref()
1043 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1044 .unwrap_or("Aggregate metric");
1045
1046 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1047 let default_name = metric_name_clone.clone();
1048 FieldFuture::new(async move {
1049 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1050 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1051 let key = metric_key(alias);
1052 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1053 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1054 })
1055 })
1056 .description(metric_desc)
1057 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1058 .description("Dimension to aggregate on (default: all rows)"));
1059
1060 if metric.supports_where {
1061 metric_field = metric_field
1062 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1063 .description("Post-aggregation filter (HAVING)"))
1064 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1065 .description("Conditional filter for this metric"));
1066 }
1067
1068 record_fields.push(metric_field);
1069 }
1070
1071 {
1073 let of_enum_name = format!("{}_quantile_Of", cube.name);
1074 let mut of_enum = Enum::new(&of_enum_name)
1075 .description(format!("Dimension to apply quantile on for {}", cube.name));
1076 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1077 metric_enums.push(of_enum);
1078
1079 let of_enum_for_closure = of_enum_name.clone();
1080 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1081 FieldFuture::new(async move {
1082 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1083 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1084 let key = metric_key(alias);
1085 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1086 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1087 })
1088 })
1089 .description("Compute a quantile (percentile) of a dimension")
1090 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1091 .description("Dimension to compute quantile on"))
1092 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1093 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1094 record_fields.push(quantile_field);
1095 }
1096
1097 {
1099 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1100 FieldFuture::new(async move {
1101 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1102 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1103 let key = metric_key(alias);
1104 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1105 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1106 })
1107 })
1108 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1109 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1110 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1111 record_fields.push(calculate_field);
1112 }
1113
1114 for jd in &cube.joins {
1116 let target_record_name = format!("{}Record", jd.target_cube);
1117 let field_name_owned = jd.field_name.clone();
1118 let mut join_field = Field::new(
1119 &jd.field_name,
1120 TypeRef::named(&target_record_name),
1121 move |ctx| {
1122 let field_name = field_name_owned.clone();
1123 FieldFuture::new(async move {
1124 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1125 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1126 let sub_row: RowMap = obj.iter()
1127 .map(|(k, v)| (k.clone(), v.clone()))
1128 .collect();
1129 Ok(Some(FieldValue::owned_any(sub_row)))
1130 } else {
1131 Ok(Some(FieldValue::value(Value::Null)))
1132 }
1133 })
1134 },
1135 );
1136 if let Some(desc) = &jd.description {
1137 join_field = join_field.description(desc);
1138 }
1139 record_fields.push(join_field);
1140 }
1141
1142 let mut record = Object::new(&record_name);
1143 for f in record_fields { record = record.field(f); }
1144
1145 let mut filter = InputObject::new(&filter_name)
1146 .description(format!("Filter conditions for {} query", cube.name));
1147 for f in filter_fields { filter = filter.field(f); }
1148
1149 let orderby_input_name = format!("{}OrderByInput", cube.name);
1150 let orderby_input = InputObject::new(&orderby_input_name)
1151 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1152 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1153 .description("Sort descending by this field"))
1154 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1155 .description("Sort ascending by this field"))
1156 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1157 .description("Sort descending by computed/aggregated field name"))
1158 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1159 .description("Sort ascending by computed/aggregated field name"));
1160
1161 let mut objects = vec![record]; objects.extend(extra_objects);
1162 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1163 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1164
1165 CubeTypes { objects, inputs, enums, unions: extra_unions }
1166}
1167
1168struct DimCollector<'a> {
1169 cube_name: &'a str,
1170 compare_enum_name: &'a str,
1171 filter_name: &'a str,
1172 record_fields: &'a mut Vec<Field>,
1173 filter_fields: &'a mut Vec<InputValue>,
1174 extra_objects: &'a mut Vec<Object>,
1175 extra_inputs: &'a mut Vec<InputObject>,
1176 extra_unions: &'a mut Vec<Union>,
1177}
1178
1179fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1180 match node {
1181 DimensionNode::Leaf(dim) => {
1182 let col = dim.column.clone();
1183 let is_datetime = dim.dim_type == DimType::DateTime;
1184 let compare_enum = c.compare_enum_name.to_string();
1185 let cube_filter = c.filter_name.to_string();
1186 let mut leaf_field = Field::new(
1187 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1188 move |ctx| {
1189 let col = col.clone();
1190 FieldFuture::new(async move {
1191 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1192 let has_interval = ctx.args.try_get("interval").is_ok();
1193 let has_max = ctx.args.try_get("maximum").is_ok();
1194 let has_min = ctx.args.try_get("minimum").is_ok();
1195 let key = if has_interval || has_max || has_min {
1196 let name = ctx.ctx.field().alias().unwrap_or(&col);
1197 dim_agg_key(name)
1198 } else {
1199 col.clone()
1200 };
1201 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1202 let gql_val = if is_datetime {
1203 json_to_gql_datetime(val)
1204 } else {
1205 json_to_gql_value(val)
1206 };
1207 Ok(Some(FieldValue::value(gql_val)))
1208 })
1209 },
1210 );
1211 if let Some(desc) = &dim.description {
1212 leaf_field = leaf_field.description(desc);
1213 }
1214 leaf_field = leaf_field
1215 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1216 .description("Return value from row where compare field is maximum (argMax)"))
1217 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1218 .description("Return value from row where compare field is minimum (argMin)"))
1219 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1220 .description("Conditional filter for aggregation"))
1221 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1222 .description("Post-aggregation value filter (HAVING)"));
1223 if is_datetime {
1224 leaf_field = leaf_field
1225 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1226 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1227 }
1228 c.record_fields.push(leaf_field);
1231 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1232 }
1233 DimensionNode::Group { graphql_name, description, children } => {
1234 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1235 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1236 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1237
1238 let mut child_record_fields: Vec<Field> = Vec::new();
1239 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1240 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1241
1242 let mut child_collector = DimCollector {
1243 cube_name: c.cube_name,
1244 compare_enum_name: c.compare_enum_name,
1245 filter_name: c.filter_name,
1246 record_fields: &mut child_record_fields,
1247 filter_fields: &mut child_filter_fields,
1248 extra_objects: c.extra_objects,
1249 extra_inputs: c.extra_inputs,
1250 extra_unions: c.extra_unions,
1251 };
1252 for child in children {
1253 collect_dimension_types(child, &new_prefix, &mut child_collector);
1254 }
1255
1256 let mut nested_record = Object::new(&nested_record_name);
1257 for f in child_record_fields { nested_record = nested_record.field(f); }
1258
1259 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1260 let mut nested_filter = InputObject::new(&nested_filter_name)
1261 .description(nested_filter_desc);
1262 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1263
1264 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1265 FieldFuture::new(async move {
1266 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1267 Ok(Some(FieldValue::owned_any(row.clone())))
1268 })
1269 });
1270 if let Some(desc) = description {
1271 group_field = group_field.description(desc);
1272 }
1273 c.record_fields.push(group_field);
1274 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1275 c.extra_objects.push(nested_record);
1276 c.extra_inputs.push(nested_filter);
1277 }
1278 DimensionNode::Array { graphql_name, description, children } => {
1279 let full_path = if prefix.is_empty() {
1280 graphql_name.clone()
1281 } else {
1282 format!("{prefix}_{graphql_name}")
1283 };
1284 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1285 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1286
1287 let mut element_obj = Object::new(&element_type_name);
1288 let mut includes_filter = InputObject::new(&includes_filter_name)
1289 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1290
1291 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1292
1293 for child in children {
1294 match &child.field_type {
1295 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1296 let col_name = child.column.clone();
1297 let is_datetime = *dt == DimType::DateTime;
1298 let mut field = Field::new(
1299 &child.graphql_name,
1300 dim_type_to_typeref(dt),
1301 move |ctx| {
1302 let col = col_name.clone();
1303 FieldFuture::new(async move {
1304 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1305 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1306 let gql_val = if is_datetime {
1307 json_to_gql_datetime(val)
1308 } else {
1309 json_to_gql_value(val)
1310 };
1311 Ok(Some(FieldValue::value(gql_val)))
1312 })
1313 },
1314 );
1315 if let Some(desc) = &child.description {
1316 field = field.description(desc);
1317 }
1318 element_obj = element_obj.field(field);
1319 includes_filter = includes_filter.field(
1320 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1321 );
1322 }
1323 crate::cube::definition::ArrayFieldType::Union(variants) => {
1324 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1325
1326 let mut gql_union = Union::new(&union_name);
1327 let mut variant_objects = Vec::new();
1328
1329 for v in variants {
1330 let variant_obj_name = v.type_name.clone();
1331 let field_name = v.field_name.clone();
1332 let source_type = v.source_type.clone();
1333 let type_ref = dim_type_to_typeref(&source_type);
1334
1335 let val_col = child.column.clone();
1336 let variant_obj = Object::new(&variant_obj_name)
1337 .field(Field::new(
1338 &field_name,
1339 type_ref,
1340 move |ctx| {
1341 let col = val_col.clone();
1342 FieldFuture::new(async move {
1343 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1344 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1345 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1346 })
1347 },
1348 ));
1349 gql_union = gql_union.possible_type(&variant_obj_name);
1350 variant_objects.push(variant_obj);
1351 }
1352
1353 let col_name = child.column.clone();
1354 let type_col = children.iter()
1355 .find(|f| f.graphql_name == "Type")
1356 .map(|f| f.column.clone())
1357 .unwrap_or_default();
1358 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1359
1360 let mut field = Field::new(
1361 &child.graphql_name,
1362 TypeRef::named(&union_name),
1363 move |ctx| {
1364 let col = col_name.clone();
1365 let tcol = type_col.clone();
1366 let vars = variants_clone.clone();
1367 FieldFuture::new(async move {
1368 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1369 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1370 if raw_val.is_null() || raw_val.as_str() == Some("") {
1371 return Ok(None);
1372 }
1373 let type_str = row.get(&tcol)
1374 .and_then(|v| v.as_str())
1375 .unwrap_or("");
1376 let resolved_type = resolve_union_typename(type_str, &vars);
1377 let mut elem = RowMap::new();
1378 elem.insert(col.clone(), raw_val);
1379 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1380 })
1381 },
1382 );
1383 if let Some(desc) = &child.description {
1384 field = field.description(desc);
1385 }
1386 element_obj = element_obj.field(field);
1387
1388 includes_filter = includes_filter.field(
1389 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1390 );
1391
1392 union_registrations.push((union_name, gql_union, variant_objects));
1393 }
1394 }
1395 }
1396
1397 for (_, union_type, variant_objs) in union_registrations {
1399 c.extra_objects.extend(variant_objs);
1400 c.extra_unions.push(union_type);
1408 }
1409
1410 let child_columns: Vec<(String, String)> = children.iter()
1412 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1413 .collect();
1414 let element_type_name_clone = element_type_name.clone();
1415
1416 let mut array_field = Field::new(
1417 graphql_name,
1418 TypeRef::named_nn_list_nn(&element_type_name),
1419 move |ctx| {
1420 let cols = child_columns.clone();
1421 let _etype = element_type_name_clone.clone();
1422 FieldFuture::new(async move {
1423 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1424 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1425 .map(|(gql_name, col)| {
1426 let arr = row.get(col)
1427 .and_then(|v| v.as_array())
1428 .cloned()
1429 .unwrap_or_default();
1430 (gql_name.as_str(), arr)
1431 })
1432 .collect();
1433
1434 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1435 let mut elements = Vec::with_capacity(len);
1436 for i in 0..len {
1437 let mut elem = RowMap::new();
1438 for (gql_name, arr) in &arrays {
1439 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1440 elem.insert(gql_name.to_string(), val);
1441 }
1442 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1444 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1445 elem.insert(col.clone(), val);
1446 }
1447 elements.push(FieldValue::owned_any(elem));
1448 }
1449 Ok(Some(FieldValue::list(elements)))
1450 })
1451 },
1452 );
1453 if let Some(desc) = description {
1454 array_field = array_field.description(desc);
1455 }
1456 c.record_fields.push(array_field);
1457
1458 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1462 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1463 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1464 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1465 .description("Match rows where at least one array element satisfies all conditions"));
1466 c.extra_inputs.push(wrapper_filter);
1467
1468 c.filter_fields.push(InputValue::new(
1469 graphql_name,
1470 TypeRef::named(&wrapper_filter_name),
1471 ));
1472
1473 c.extra_objects.push(element_obj);
1474 c.extra_inputs.push(includes_filter);
1475 }
1476 }
1477}
1478
1479fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1481 let fallback = variants.last().map(|v| v.type_name.clone()).unwrap_or_default();
1482 match type_str {
1483 "u8" | "u16" | "u32" | "i32" => {
1484 variants.iter().find(|v| v.field_name == "integer")
1485 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1486 }
1487 "u64" | "u128" | "i64" => {
1488 variants.iter().find(|v| v.field_name == "bigInteger")
1489 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1490 }
1491 "string" => {
1492 variants.iter().find(|v| v.field_name == "string")
1493 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1494 }
1495 "publicKey" | "pubkey" => {
1496 variants.iter().find(|v| v.field_name == "address")
1497 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1498 }
1499 "bool" => {
1500 variants.iter().find(|v| v.field_name == "bool")
1501 .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1502 }
1503 _ => fallback,
1504 }
1505}
1506
1507fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1508 match dt {
1509 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1510 DimType::Int => TypeRef::named(TypeRef::INT),
1511 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1512 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1513 }
1514}
1515
1516fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1517 match dt {
1518 DimType::String => "StringFilter",
1519 DimType::Int => "IntFilter",
1520 DimType::Float => "FloatFilter",
1521 DimType::DateTime => "DateTimeFilter",
1522 DimType::Bool => "BoolFilter",
1523 }
1524}
1525
1526pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1527 match v {
1528 serde_json::Value::Null => Value::Null,
1529 serde_json::Value::Bool(b) => Value::from(b),
1530 serde_json::Value::Number(n) => {
1531 if let Some(i) = n.as_i64() { Value::from(i) }
1532 else if let Some(f) = n.as_f64() { Value::from(f) }
1533 else { Value::from(n.to_string()) }
1534 }
1535 serde_json::Value::String(s) => Value::from(s),
1536 _ => Value::from(v.to_string()),
1537 }
1538}
1539
1540fn build_join_expr(
1543 jd: &crate::cube::definition::JoinDef,
1544 target_cube: &CubeDefinition,
1545 sub_field: &async_graphql::SelectionField<'_>,
1546 network: &str,
1547 join_idx: usize,
1548) -> JoinExpr {
1549 let target_flat = target_cube.flat_dimensions();
1550 let target_table = target_cube.table_for_chain(network);
1551
1552 let mut requested_paths = HashSet::new();
1553 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1554
1555 let mut selects: Vec<SelectExpr> = target_flat.iter()
1556 .filter(|(path, _)| requested_paths.contains(path))
1557 .map(|(_, dim)| SelectExpr::Column {
1558 column: dim.column.clone(),
1559 alias: None,
1560 })
1561 .collect();
1562
1563 if selects.is_empty() {
1564 selects = target_flat.iter()
1565 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1566 .collect();
1567 }
1568
1569 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1570
1571 let group_by = if is_aggregate {
1572 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1573 for sel in &selects {
1574 if let SelectExpr::Column { column, .. } = sel {
1575 if !column.contains('(') && !gb.contains(column) {
1576 gb.push(column.clone());
1577 }
1578 }
1579 }
1580 gb
1581 } else {
1582 vec![]
1583 };
1584
1585 JoinExpr {
1586 schema: target_cube.schema.clone(),
1587 table: target_table,
1588 alias: format!("_j{}", join_idx),
1589 conditions: jd.conditions.clone(),
1590 selects,
1591 group_by,
1592 use_final: target_cube.use_final,
1593 is_aggregate,
1594 target_cube: jd.target_cube.clone(),
1595 join_field: sub_field.name().to_string(),
1596 join_type: jd.join_type.clone(),
1597 }
1598}
1599
1600fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1603 match v {
1604 serde_json::Value::String(s) => {
1605 let iso = if s.contains('T') {
1606 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1607 } else {
1608 let replaced = s.replacen(' ', "T", 1);
1609 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1610 };
1611 Value::from(iso)
1612 }
1613 other => json_to_gql_value(other),
1614 }
1615}