1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59 pub network_enum_name: Option<String>,
63}
64
65pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70pub struct SchemaConfig {
72 pub chain_groups: Vec<ChainGroupConfig>,
73 pub root_query_name: String,
74 pub stats_callback: Option<StatsCallback>,
77 pub table_name_transform: Option<TableNameTransform>,
79 pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84 fn default() -> Self {
85 Self {
86 chain_groups: vec![
87 ChainGroupConfig {
88 name: "EVM".to_string(),
89 group: ChainGroup::Evm,
90 networks: vec!["eth".into(), "bsc".into()],
91 has_network_arg: true,
92 extra_args: vec![],
93 network_enum_name: None,
94 },
95 ChainGroupConfig {
96 name: "Solana".to_string(),
97 group: ChainGroup::Solana,
98 networks: vec!["sol".into()],
99 has_network_arg: false,
100 extra_args: vec![],
101 network_enum_name: None,
102 },
103 ChainGroupConfig {
104 name: "Trading".to_string(),
105 group: ChainGroup::Trading,
106 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107 has_network_arg: false,
108 extra_args: vec![],
109 network_enum_name: None,
110 },
111 ],
112 root_query_name: "ChainStream".to_string(),
113 stats_callback: None,
114 table_name_transform: None,
115 extra_types: vec![],
116 }
117 }
118}
119
120pub struct ChainContext {
124 pub network: String,
125 pub extra: HashMap<String, String>,
126}
127
128pub fn build_schema(
140 registry: CubeRegistry,
141 dialect: Arc<dyn SqlDialect>,
142 executor: QueryExecutor,
143 config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145 let mut builder = Schema::build("Query", None, None);
146
147 builder = builder.register(filter_types::build_limit_input());
149 builder = builder.register(
151 Enum::new("OrderDirection")
152 .description("Sort direction")
153 .item(EnumItem::new("ASC").description("Ascending"))
154 .item(EnumItem::new("DESC").description("Descending")),
155 );
156 for input in filter_types::build_filter_primitives() {
157 builder = builder.register(input);
158 }
159 builder = builder.register(
160 Scalar::new("DateTime")
161 .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162 );
163 builder = builder.register(
164 Scalar::new("ISO8601DateTime")
165 .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166 );
167 builder = builder.register(
168 Enum::new("TimeUnit")
169 .description("Time unit for interval bucketing")
170 .item(EnumItem::new("seconds"))
171 .item(EnumItem::new("minutes"))
172 .item(EnumItem::new("hours"))
173 .item(EnumItem::new("days"))
174 .item(EnumItem::new("weeks"))
175 .item(EnumItem::new("months")),
176 );
177 builder = builder.register(
178 InputObject::new("TimeIntervalInput")
179 .description("Time bucketing interval for DateTime dimensions")
180 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182 );
183 builder = builder.register(
184 InputObject::new("DimSelectWhere")
185 .description("Post-aggregation filter for dimension values (HAVING clause)")
186 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192 );
193
194 for extra_enum in config.extra_types {
195 builder = builder.register(extra_enum);
196 }
197
198 for grp in &config.chain_groups {
200 if grp.has_network_arg {
201 let enum_name = grp.network_enum_name.clone()
202 .unwrap_or_else(|| format!("{}Network", grp.name));
203 let mut net_enum = Enum::new(&enum_name)
204 .description(format!("{} network selector", grp.name));
205 for net in &grp.networks {
206 net_enum = net_enum.item(EnumItem::new(net));
207 }
208 builder = builder.register(net_enum);
209 }
210 }
211
212 let mut registered_cubes: HashSet<String> = HashSet::new();
214 for cube in registry.cubes() {
215 if registered_cubes.contains(&cube.name) { continue; }
216 registered_cubes.insert(cube.name.clone());
217
218 let types = build_cube_types(cube);
219 for obj in types.objects { builder = builder.register(obj); }
220 for inp in types.inputs { builder = builder.register(inp); }
221 for en in types.enums { builder = builder.register(en); }
222 for un in types.unions { builder = builder.register(un); }
223 }
224
225 let mut query = Object::new("Query");
227
228 for grp in &config.chain_groups {
229 let wrapper_type_name = grp.name.clone();
230
231 let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235 for cube in registry.cubes() {
236 if !cube.chain_groups.contains(&grp.group) { continue; }
237
238 let cube_field = build_cube_field(
239 cube,
240 dialect.clone(),
241 executor.clone(),
242 config.stats_callback.clone(),
243 );
244 wrapper_obj = wrapper_obj.field(cube_field);
245 }
246 builder = builder.register(wrapper_obj);
247
248 let default_network = grp.networks.first().cloned().unwrap_or_default();
250 let has_net_arg = grp.has_network_arg;
251 let net_enum_name = grp.network_enum_name.clone()
252 .unwrap_or_else(|| format!("{}Network", grp.name));
253 let wrapper_type_for_resolver = wrapper_type_name.clone();
254 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256 let mut wrapper_field = Field::new(
257 &wrapper_type_name,
258 TypeRef::named_nn(&wrapper_type_name),
259 move |ctx| {
260 let default = default_network.clone();
261 let has_arg = has_net_arg;
262 let arg_names = extra_arg_names.clone();
263 FieldFuture::new(async move {
264 let network = if has_arg {
265 ctx.args.try_get("network")
266 .ok()
267 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268 .unwrap_or(default)
269 } else {
270 default
271 };
272 let mut extra = HashMap::new();
273 for arg_name in &arg_names {
274 if let Ok(val) = ctx.args.try_get(arg_name) {
275 let resolved = val.enum_name().ok().map(|s| s.to_string())
276 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277 .or_else(|| val.string().ok().map(|s| s.to_string()));
278 if let Some(v) = resolved {
279 extra.insert(arg_name.clone(), v);
280 }
281 }
282 }
283 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284 })
285 },
286 );
287
288 if grp.has_network_arg {
289 wrapper_field = wrapper_field.argument(
290 InputValue::new("network", TypeRef::named_nn(&net_enum_name))
291 .description(format!("{} network to query", wrapper_type_for_resolver)),
292 );
293 }
294 for wa in &grp.extra_args {
295 wrapper_field = wrapper_field.argument(
296 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
297 .description(&wa.description),
298 );
299 }
300
301 query = query.field(wrapper_field);
302 }
303
304 let metadata_registry = Arc::new(registry.clone());
306 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
307 .map(|g| (g.name.clone(), g.group.clone()))
308 .collect();
309 let metadata_field = Field::new(
310 "_cubeMetadata",
311 TypeRef::named_nn(TypeRef::STRING),
312 move |_ctx| {
313 let reg = metadata_registry.clone();
314 let groups = metadata_groups.clone();
315 FieldFuture::new(async move {
316 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
317 for (group_name, group_enum) in &groups {
318 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
319 .filter(|c| c.chain_groups.contains(group_enum))
320 .map(serialize_cube_metadata)
321 .collect();
322 group_metadata.push(serde_json::json!({
323 "group": group_name,
324 "cubes": cubes_in_group,
325 }));
326 }
327 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
328 Ok(Some(FieldValue::value(Value::from(json))))
329 })
330 },
331 )
332 .description("Internal: returns JSON metadata about all cubes grouped by chain");
333 query = query.field(metadata_field);
334
335 builder = builder.register(query);
336 builder = builder.data(registry);
337
338 if let Some(transform) = config.table_name_transform.clone() {
339 builder = builder.data(transform);
340 }
341
342 builder.finish()
343}
344
345fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
346 serde_json::json!({
347 "name": cube.name,
348 "description": cube.description,
349 "schema": cube.schema,
350 "tablePattern": cube.table_pattern,
351 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
352 "metrics": cube.metrics.iter().map(|m| {
353 let mut obj = serde_json::json!({
354 "name": m.name,
355 "returnType": format!("{:?}", m.return_type),
356 });
357 if let Some(ref tmpl) = m.expression_template {
358 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
359 }
360 if let Some(ref desc) = m.description {
361 obj["description"] = serde_json::Value::String(desc.clone());
362 }
363 obj
364 }).collect::<Vec<_>>(),
365 "selectors": cube.selectors.iter().map(|s| {
366 serde_json::json!({
367 "name": s.graphql_name,
368 "column": s.column,
369 "type": format!("{:?}", s.dim_type),
370 })
371 }).collect::<Vec<_>>(),
372 "dimensions": serialize_dims(&cube.dimensions),
373 "joins": cube.joins.iter().map(|j| {
374 serde_json::json!({
375 "field": j.field_name,
376 "target": j.target_cube,
377 "joinType": format!("{:?}", j.join_type),
378 })
379 }).collect::<Vec<_>>(),
380 "tableRoutes": cube.table_routes.iter().map(|r| {
381 serde_json::json!({
382 "schema": r.schema,
383 "tablePattern": r.table_pattern,
384 "availableColumns": r.available_columns,
385 "priority": r.priority,
386 })
387 }).collect::<Vec<_>>(),
388 "defaultLimit": cube.default_limit,
389 "maxLimit": cube.max_limit,
390 })
391}
392
393fn build_cube_field(
395 cube: &CubeDefinition,
396 dialect: Arc<dyn SqlDialect>,
397 executor: QueryExecutor,
398 stats_cb: Option<StatsCallback>,
399) -> Field {
400 let cube_name = cube.name.clone();
401 let orderby_input_name = format!("{}OrderByInput", cube.name);
402 let cube_description = cube.description.clone();
403
404 let mut field = Field::new(
405 &cube.name,
406 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
407 move |ctx| {
408 let cube_name = cube_name.clone();
409 let dialect = dialect.clone();
410 let executor = executor.clone();
411 let stats_cb = stats_cb.clone();
412 FieldFuture::new(async move {
413 let registry = ctx.ctx.data::<CubeRegistry>()?;
414
415 let chain_ctx = ctx.parent_value
416 .try_downcast_ref::<ChainContext>().ok();
417 let network = chain_ctx
418 .map(|c| c.network.as_str())
419 .unwrap_or("sol");
420
421 let cube_def = registry.get(&cube_name).ok_or_else(|| {
422 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
423 })?;
424
425 let metric_requests = extract_metric_requests(&ctx, cube_def);
426 let quantile_requests = extract_quantile_requests(&ctx);
427 let calculate_requests = extract_calculate_requests(&ctx);
428 let field_aliases = extract_field_aliases(&ctx, cube_def);
429 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
430 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
431 let requested = extract_requested_fields(&ctx, cube_def);
432 let mut ir = compiler::parser::parse_cube_query(
433 cube_def,
434 network,
435 &ctx.args,
436 &metric_requests,
437 &quantile_requests,
438 &calculate_requests,
439 &field_aliases,
440 &dim_agg_requests,
441 &time_intervals,
442 Some(requested),
443 )?;
444
445 let mut join_idx = 0usize;
446 for sub_field in ctx.ctx.field().selection_set() {
447 let fname = sub_field.name().to_string();
448 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
449 if let Some(jd) = join_def {
450 if let Some(target_cube) = registry.get(&jd.target_cube) {
451 let join_expr = build_join_expr(
452 jd, target_cube, &sub_field, network, join_idx,
453 );
454 ir.joins.push(join_expr);
455 join_idx += 1;
456 }
457 }
458 }
459
460 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
461 if let Some(cctx) = chain_ctx {
462 ir.table = transform(&ir.table, cctx);
463 }
464 }
465
466 let validated = compiler::validator::validate(ir)?;
467 let result = dialect.compile(&validated);
468 let sql = result.sql;
469 let bindings = result.bindings;
470
471 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
472 async_graphql::Error::new(format!("Query execution failed: {e}"))
473 })?;
474
475 let rows = if result.alias_remap.is_empty() {
476 rows
477 } else {
478 rows.into_iter().map(|mut row| {
479 for (alias, original) in &result.alias_remap {
480 if let Some(val) = row.shift_remove(alias) {
481 row.entry(original.clone()).or_insert(val);
482 }
483 }
484 row
485 }).collect()
486 };
487
488 let rows: Vec<RowMap> = if validated.joins.is_empty() {
489 rows
490 } else {
491 rows.into_iter().map(|mut row| {
492 for join in &validated.joins {
493 let prefix = format!("{}.", join.alias);
494 let mut sub_row = RowMap::new();
495 let keys: Vec<String> = row.keys()
496 .filter(|k| k.starts_with(&prefix))
497 .cloned()
498 .collect();
499 for key in keys {
500 if let Some(val) = row.shift_remove(&key) {
501 sub_row.insert(key[prefix.len()..].to_string(), val);
502 }
503 }
504 let obj: serde_json::Map<String, serde_json::Value> =
505 sub_row.into_iter().collect();
506 row.insert(
507 join.join_field.clone(),
508 serde_json::Value::Object(obj),
509 );
510 }
511 row
512 }).collect()
513 };
514
515 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
516 .or_else(|| stats_cb.clone());
517 if let Some(cb) = effective_cb {
518 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
519 cb(stats);
520 }
521
522 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
523 Ok(Some(FieldValue::list(values)))
524 })
525 },
526 );
527 if !cube_description.is_empty() {
528 field = field.description(&cube_description);
529 }
530 field = field
531 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
532 .description("Filter conditions"))
533 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
534 .description("Pagination control"))
535 .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
536 .description("Per-group row limit"))
537 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
538 .description("Sort order (Bitquery-compatible)"));
539
540 for sel in &cube.selectors {
541 let filter_type = dim_type_to_filter_name(&sel.dim_type);
542 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
543 .description(format!("Shorthand filter for {}", sel.graphql_name)));
544 }
545
546 field
547}
548
549fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
550 serde_json::Value::Array(dims.iter().map(|d| match d {
551 DimensionNode::Leaf(dim) => {
552 let mut obj = serde_json::json!({
553 "name": dim.graphql_name,
554 "column": dim.column,
555 "type": format!("{:?}", dim.dim_type),
556 });
557 if let Some(desc) = &dim.description {
558 obj["description"] = serde_json::Value::String(desc.clone());
559 }
560 obj
561 },
562 DimensionNode::Group { graphql_name, description, children } => {
563 let mut obj = serde_json::json!({
564 "name": graphql_name,
565 "children": serialize_dims(children),
566 });
567 if let Some(desc) = description {
568 obj["description"] = serde_json::Value::String(desc.clone());
569 }
570 obj
571 },
572 DimensionNode::Array { graphql_name, description, children } => {
573 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
574 let type_value = match &f.field_type {
575 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
576 "kind": "scalar",
577 "scalarType": format!("{:?}", dt),
578 }),
579 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
580 "kind": "union",
581 "variants": variants.iter().map(|v| serde_json::json!({
582 "typeName": v.type_name,
583 "fieldName": v.field_name,
584 "sourceType": format!("{:?}", v.source_type),
585 })).collect::<Vec<_>>(),
586 }),
587 };
588 let mut field_obj = serde_json::json!({
589 "name": f.graphql_name,
590 "column": f.column,
591 "type": type_value,
592 });
593 if let Some(desc) = &f.description {
594 field_obj["description"] = serde_json::Value::String(desc.clone());
595 }
596 field_obj
597 }).collect();
598 let mut obj = serde_json::json!({
599 "name": graphql_name,
600 "kind": "array",
601 "fields": fields,
602 });
603 if let Some(desc) = description {
604 obj["description"] = serde_json::Value::String(desc.clone());
605 }
606 obj
607 },
608 }).collect())
609}
610
611fn extract_metric_requests(
615 ctx: &async_graphql::dynamic::ResolverContext,
616 cube: &CubeDefinition,
617) -> Vec<compiler::parser::MetricRequest> {
618 let mut requests = Vec::new();
619
620 for sub_field in ctx.ctx.field().selection_set() {
621 let name = sub_field.name();
622 if !cube.has_metric(name) {
623 continue;
624 }
625
626 let args = match sub_field.arguments() {
627 Ok(args) => args,
628 Err(_) => continue,
629 };
630
631 let of_dimension = args
632 .iter()
633 .find(|(k, _)| k.as_str() == "of")
634 .and_then(|(_, v)| match v {
635 async_graphql::Value::Enum(e) => Some(e.to_string()),
636 async_graphql::Value::String(s) => Some(s.clone()),
637 _ => None,
638 })
639 .unwrap_or_else(|| "*".to_string());
640
641 let select_where_value = args
642 .iter()
643 .find(|(k, _)| k.as_str() == "selectWhere")
644 .map(|(_, v)| v.clone());
645
646 let condition_filter = args
647 .iter()
648 .find(|(k, _)| k.as_str() == "if")
649 .and_then(|(_, v)| {
650 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
651 .and_then(|f| if f.is_empty() { None } else { Some(f) })
652 });
653
654 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
655 requests.push(compiler::parser::MetricRequest {
656 function: name.to_string(),
657 alias: gql_alias,
658 of_dimension,
659 select_where_value,
660 condition_filter,
661 });
662 }
663
664 requests
665}
666
667fn extract_requested_fields(
668 ctx: &async_graphql::dynamic::ResolverContext,
669 cube: &CubeDefinition,
670) -> HashSet<String> {
671 let mut fields = HashSet::new();
672 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
673 fields
674}
675
676fn collect_selection_paths(
677 field: &async_graphql::SelectionField<'_>,
678 prefix: &str,
679 out: &mut HashSet<String>,
680 metrics: &[MetricDef],
681) {
682 for sub in field.selection_set() {
683 let name = sub.name();
684 if metrics.iter().any(|m| m.name == name) {
685 continue;
686 }
687 let path = if prefix.is_empty() {
688 name.to_string()
689 } else {
690 format!("{prefix}_{name}")
691 };
692 let has_children = sub.selection_set().next().is_some();
693 if has_children {
694 collect_selection_paths(&sub, &path, out, metrics);
695 } else {
696 out.insert(path);
697 }
698 }
699}
700
701pub type FieldAliasMap = Vec<(String, String)>;
704
705pub struct QuantileRequest {
707 pub alias: String,
708 pub of_dimension: String,
709 pub level: f64,
710}
711
712pub struct CalculateRequest {
714 pub alias: String,
715 pub expression: String,
716}
717
718pub struct TimeIntervalRequest {
721 pub field_path: String,
722 pub graphql_alias: String,
723 pub column: String,
724 pub unit: String,
725 pub count: i64,
726}
727
728pub struct DimAggRequest {
731 pub field_path: String,
732 pub graphql_alias: String,
733 pub value_column: String,
734 pub agg_type: DimAggType,
735 pub compare_column: String,
736 pub condition_filter: Option<FilterNode>,
737 pub select_where_value: Option<async_graphql::Value>,
738}
739
740fn extract_quantile_requests(
741 ctx: &async_graphql::dynamic::ResolverContext,
742) -> Vec<QuantileRequest> {
743 let mut requests = Vec::new();
744 for sub_field in ctx.ctx.field().selection_set() {
745 if sub_field.name() != "quantile" { continue; }
746 let args = match sub_field.arguments() {
747 Ok(a) => a,
748 Err(_) => continue,
749 };
750 let of_dim = args.iter()
751 .find(|(k, _)| k.as_str() == "of")
752 .and_then(|(_, v)| match v {
753 async_graphql::Value::Enum(e) => Some(e.to_string()),
754 async_graphql::Value::String(s) => Some(s.clone()),
755 _ => None,
756 })
757 .unwrap_or_else(|| "*".to_string());
758 let level = args.iter()
759 .find(|(k, _)| k.as_str() == "level")
760 .and_then(|(_, v)| match v {
761 async_graphql::Value::Number(n) => n.as_f64(),
762 _ => None,
763 })
764 .unwrap_or(0.5);
765 let alias = sub_field.alias().unwrap_or("quantile").to_string();
766 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
767 }
768 requests
769}
770
771fn extract_field_aliases(
772 ctx: &async_graphql::dynamic::ResolverContext,
773 cube: &CubeDefinition,
774) -> FieldAliasMap {
775 let flat = cube.flat_dimensions();
776 let mut aliases = Vec::new();
777 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
778 aliases
779}
780
781fn collect_field_aliases(
782 field: &async_graphql::SelectionField<'_>,
783 prefix: &str,
784 flat: &[(String, crate::cube::definition::Dimension)],
785 metrics: &[MetricDef],
786 out: &mut FieldAliasMap,
787) {
788 for sub in field.selection_set() {
789 let name = sub.name();
790 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
791 continue;
792 }
793 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
794 let has_children = sub.selection_set().next().is_some();
795 if has_children {
796 collect_field_aliases(&sub, &path, flat, metrics, out);
797 } else if let Some(alias) = sub.alias() {
798 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
799 let alias_path = if prefix.is_empty() {
800 alias.to_string()
801 } else {
802 format!("{prefix}_{alias}")
803 };
804 out.push((alias_path, dim.column.clone()));
805 }
806 }
807 }
808}
809
810fn extract_calculate_requests(
811 ctx: &async_graphql::dynamic::ResolverContext,
812) -> Vec<CalculateRequest> {
813 let mut requests = Vec::new();
814 for sub_field in ctx.ctx.field().selection_set() {
815 if sub_field.name() != "calculate" { continue; }
816 let args = match sub_field.arguments() {
817 Ok(a) => a,
818 Err(_) => continue,
819 };
820 let expression = args.iter()
821 .find(|(k, _)| k.as_str() == "expression")
822 .and_then(|(_, v)| match v {
823 async_graphql::Value::String(s) => Some(s.clone()),
824 _ => None,
825 });
826 if let Some(expr) = expression {
827 let alias = sub_field.alias().unwrap_or("calculate").to_string();
828 requests.push(CalculateRequest { alias, expression: expr });
829 }
830 }
831 requests
832}
833
834fn extract_dim_agg_requests(
835 ctx: &async_graphql::dynamic::ResolverContext,
836 cube: &CubeDefinition,
837) -> Vec<DimAggRequest> {
838 let flat = cube.flat_dimensions();
839 let mut requests = Vec::new();
840 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
841 requests
842}
843
844fn collect_dim_agg_paths(
845 field: &async_graphql::SelectionField<'_>,
846 prefix: &str,
847 flat: &[(String, crate::cube::definition::Dimension)],
848 metrics: &[MetricDef],
849 cube: &CubeDefinition,
850 out: &mut Vec<DimAggRequest>,
851) {
852 for sub in field.selection_set() {
853 let name = sub.name();
854 if metrics.iter().any(|m| m.name == name) {
855 continue;
856 }
857 let path = if prefix.is_empty() {
858 name.to_string()
859 } else {
860 format!("{prefix}_{name}")
861 };
862 let has_children = sub.selection_set().next().is_some();
863 if has_children {
864 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
865 } else {
866 let args = match sub.arguments() {
867 Ok(a) => a,
868 Err(_) => continue,
869 };
870 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
871 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
872
873 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
874 let cp = match v {
875 async_graphql::Value::Enum(e) => e.to_string(),
876 async_graphql::Value::String(s) => s.clone(),
877 _ => continue,
878 };
879 (DimAggType::ArgMax, cp)
880 } else if let Some((_, v)) = min_val {
881 let cp = match v {
882 async_graphql::Value::Enum(e) => e.to_string(),
883 async_graphql::Value::String(s) => s.clone(),
884 _ => continue,
885 };
886 (DimAggType::ArgMin, cp)
887 } else {
888 continue;
889 };
890
891 let value_column = flat.iter()
892 .find(|(p, _)| p == &path)
893 .map(|(_, dim)| dim.column.clone());
894 let compare_column = flat.iter()
895 .find(|(p, _)| p == &compare_path)
896 .map(|(_, dim)| dim.column.clone());
897
898 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
899 let condition_filter = args.iter()
900 .find(|(k, _)| k.as_str() == "if")
901 .and_then(|(_, v)| {
902 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
903 .and_then(|f| if f.is_empty() { None } else { Some(f) })
904 });
905 let select_where_value = args.iter()
906 .find(|(k, _)| k.as_str() == "selectWhere")
907 .map(|(_, v)| v.clone());
908
909 let gql_alias = sub.alias()
910 .map(|a| a.to_string())
911 .unwrap_or_else(|| path.clone());
912 out.push(DimAggRequest {
913 field_path: path,
914 graphql_alias: gql_alias,
915 value_column: vc,
916 agg_type,
917 compare_column: cc,
918 condition_filter,
919 select_where_value,
920 });
921 }
922 }
923 }
924}
925
926fn extract_time_interval_requests(
927 ctx: &async_graphql::dynamic::ResolverContext,
928 cube: &CubeDefinition,
929) -> Vec<TimeIntervalRequest> {
930 let flat = cube.flat_dimensions();
931 let mut requests = Vec::new();
932 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
933 requests
934}
935
936fn collect_time_interval_paths(
937 field: &async_graphql::SelectionField<'_>,
938 prefix: &str,
939 flat: &[(String, crate::cube::definition::Dimension)],
940 metrics: &[MetricDef],
941 out: &mut Vec<TimeIntervalRequest>,
942) {
943 for sub in field.selection_set() {
944 let name = sub.name();
945 if metrics.iter().any(|m| m.name == name) { continue; }
946 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
947 let has_children = sub.selection_set().next().is_some();
948 if has_children {
949 collect_time_interval_paths(&sub, &path, flat, metrics, out);
950 } else {
951 let args = match sub.arguments() {
952 Ok(a) => a,
953 Err(_) => continue,
954 };
955 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
956 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
957 let unit = obj.get("in")
958 .and_then(|v| match v {
959 async_graphql::Value::Enum(e) => Some(e.to_string()),
960 async_graphql::Value::String(s) => Some(s.clone()),
961 _ => None,
962 });
963 let count = obj.get("count")
964 .and_then(|v| match v {
965 async_graphql::Value::Number(n) => n.as_i64(),
966 _ => None,
967 });
968 if let (Some(unit), Some(count)) = (unit, count) {
969 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
970 let gql_alias = sub.alias().unwrap_or(name).to_string();
971 out.push(TimeIntervalRequest {
972 field_path: path,
973 graphql_alias: gql_alias,
974 column: dim.column.clone(),
975 unit,
976 count,
977 });
978 }
979 }
980 }
981 }
982 }
983}
984
985struct CubeTypes {
990 objects: Vec<Object>,
991 inputs: Vec<InputObject>,
992 enums: Vec<Enum>,
993 unions: Vec<Union>,
994}
995
996fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
997 let record_name = format!("{}Record", cube.name);
998 let filter_name = format!("{}Filter", cube.name);
999 let compare_enum_name = format!("{}CompareFields", cube.name);
1000
1001 let flat_dims = cube.flat_dimensions();
1002
1003 let mut compare_enum = Enum::new(&compare_enum_name)
1004 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1005 for (path, _) in &flat_dims {
1006 compare_enum = compare_enum.item(EnumItem::new(path));
1007 }
1008
1009 let mut record_fields: Vec<Field> = Vec::new();
1010 let mut filter_fields: Vec<InputValue> = Vec::new();
1011 let mut extra_objects: Vec<Object> = Vec::new();
1012 let mut extra_inputs: Vec<InputObject> = Vec::new();
1013 let mut extra_unions: Vec<Union> = Vec::new();
1014
1015 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1016 .description("OR combinator — matches if any sub-filter matches"));
1017
1018 {
1019 let mut collector = DimCollector {
1020 cube_name: &cube.name,
1021 compare_enum_name: &compare_enum_name,
1022 filter_name: &filter_name,
1023 record_fields: &mut record_fields,
1024 filter_fields: &mut filter_fields,
1025 extra_objects: &mut extra_objects,
1026 extra_inputs: &mut extra_inputs,
1027 extra_unions: &mut extra_unions,
1028 };
1029 for node in &cube.dimensions {
1030 collect_dimension_types(node, "", &mut collector);
1031 }
1032 }
1033
1034 let mut metric_enums: Vec<Enum> = Vec::new();
1035 let builtin_descs: std::collections::HashMap<&str, &str> = [
1036 ("count", "Count of rows or distinct values"),
1037 ("sum", "Sum of values"),
1038 ("avg", "Average of values"),
1039 ("min", "Minimum value"),
1040 ("max", "Maximum value"),
1041 ("uniq", "Count of unique (distinct) values"),
1042 ].into_iter().collect();
1043
1044 for metric in &cube.metrics {
1045 let metric_name = &metric.name;
1046 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1047
1048 if metric.supports_where {
1049 extra_inputs.push(
1050 InputObject::new(&select_where_name)
1051 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1052 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1053 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1054 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1055 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1056 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1057 );
1058 }
1059
1060 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1061 let mut of_enum = Enum::new(&of_enum_name)
1062 .description(format!("Dimension to apply {} aggregation on", metric_name));
1063 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1064 metric_enums.push(of_enum);
1065
1066 let metric_name_clone = metric_name.clone();
1067 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1068 let metric_desc = metric.description.as_deref()
1069 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1070 .unwrap_or("Aggregate metric");
1071
1072 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1073 let default_name = metric_name_clone.clone();
1074 FieldFuture::new(async move {
1075 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1076 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1077 let key = metric_key(alias);
1078 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1079 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1080 })
1081 })
1082 .description(metric_desc)
1083 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1084 .description("Dimension to aggregate on (default: all rows)"));
1085
1086 if metric.supports_where {
1087 metric_field = metric_field
1088 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1089 .description("Post-aggregation filter (HAVING)"))
1090 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1091 .description("Conditional filter for this metric"));
1092 }
1093
1094 record_fields.push(metric_field);
1095 }
1096
1097 {
1099 let of_enum_name = format!("{}_quantile_Of", cube.name);
1100 let mut of_enum = Enum::new(&of_enum_name)
1101 .description(format!("Dimension to apply quantile on for {}", cube.name));
1102 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1103 metric_enums.push(of_enum);
1104
1105 let of_enum_for_closure = of_enum_name.clone();
1106 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1107 FieldFuture::new(async move {
1108 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1109 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1110 let key = metric_key(alias);
1111 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1112 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1113 })
1114 })
1115 .description("Compute a quantile (percentile) of a dimension")
1116 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1117 .description("Dimension to compute quantile on"))
1118 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1119 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1120 record_fields.push(quantile_field);
1121 }
1122
1123 {
1125 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1126 FieldFuture::new(async move {
1127 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1128 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1129 let key = metric_key(alias);
1130 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1131 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1132 })
1133 })
1134 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1135 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1136 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1137 record_fields.push(calculate_field);
1138 }
1139
1140 for jd in &cube.joins {
1142 let target_record_name = format!("{}Record", jd.target_cube);
1143 let field_name_owned = jd.field_name.clone();
1144 let mut join_field = Field::new(
1145 &jd.field_name,
1146 TypeRef::named(&target_record_name),
1147 move |ctx| {
1148 let field_name = field_name_owned.clone();
1149 FieldFuture::new(async move {
1150 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1151 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1152 let sub_row: RowMap = obj.iter()
1153 .map(|(k, v)| (k.clone(), v.clone()))
1154 .collect();
1155 Ok(Some(FieldValue::owned_any(sub_row)))
1156 } else {
1157 Ok(Some(FieldValue::value(Value::Null)))
1158 }
1159 })
1160 },
1161 );
1162 if let Some(desc) = &jd.description {
1163 join_field = join_field.description(desc);
1164 }
1165 record_fields.push(join_field);
1166 }
1167
1168 let mut record = Object::new(&record_name);
1169 for f in record_fields { record = record.field(f); }
1170
1171 let mut filter = InputObject::new(&filter_name)
1172 .description(format!("Filter conditions for {} query", cube.name));
1173 for f in filter_fields { filter = filter.field(f); }
1174
1175 let orderby_input_name = format!("{}OrderByInput", cube.name);
1176 let orderby_input = InputObject::new(&orderby_input_name)
1177 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1178 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1179 .description("Sort descending by this field"))
1180 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1181 .description("Sort ascending by this field"))
1182 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1183 .description("Sort descending by computed/aggregated field name"))
1184 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1185 .description("Sort ascending by computed/aggregated field name"));
1186
1187 let limitby_input_name = format!("{}LimitByInput", cube.name);
1188 let limitby_input = InputObject::new(&limitby_input_name)
1189 .description(format!("Per-group row limit for {}", cube.name))
1190 .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1191 .description("Dimension field to group by"))
1192 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1193 .description("Maximum rows per group"))
1194 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1195 .description("Rows to skip per group"));
1196
1197 let mut objects = vec![record]; objects.extend(extra_objects);
1198 let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1199 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1200
1201 CubeTypes { objects, inputs, enums, unions: extra_unions }
1202}
1203
1204struct DimCollector<'a> {
1205 cube_name: &'a str,
1206 compare_enum_name: &'a str,
1207 filter_name: &'a str,
1208 record_fields: &'a mut Vec<Field>,
1209 filter_fields: &'a mut Vec<InputValue>,
1210 extra_objects: &'a mut Vec<Object>,
1211 extra_inputs: &'a mut Vec<InputObject>,
1212 extra_unions: &'a mut Vec<Union>,
1213}
1214
1215fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1216 match node {
1217 DimensionNode::Leaf(dim) => {
1218 let col = dim.column.clone();
1219 let is_datetime = dim.dim_type == DimType::DateTime;
1220 let compare_enum = c.compare_enum_name.to_string();
1221 let cube_filter = c.filter_name.to_string();
1222 let mut leaf_field = Field::new(
1223 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1224 move |ctx| {
1225 let col = col.clone();
1226 FieldFuture::new(async move {
1227 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1228 let has_interval = ctx.args.try_get("interval").is_ok();
1229 let has_max = ctx.args.try_get("maximum").is_ok();
1230 let has_min = ctx.args.try_get("minimum").is_ok();
1231 let key = if has_interval || has_max || has_min {
1232 let name = ctx.ctx.field().alias().unwrap_or(&col);
1233 dim_agg_key(name)
1234 } else {
1235 col.clone()
1236 };
1237 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1238 let gql_val = if is_datetime {
1239 json_to_gql_datetime(val)
1240 } else {
1241 json_to_gql_value(val)
1242 };
1243 Ok(Some(FieldValue::value(gql_val)))
1244 })
1245 },
1246 );
1247 if let Some(desc) = &dim.description {
1248 leaf_field = leaf_field.description(desc);
1249 }
1250 leaf_field = leaf_field
1251 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1252 .description("Return value from row where compare field is maximum (argMax)"))
1253 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1254 .description("Return value from row where compare field is minimum (argMin)"))
1255 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1256 .description("Conditional filter for aggregation"))
1257 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1258 .description("Post-aggregation value filter (HAVING)"));
1259 if is_datetime {
1260 leaf_field = leaf_field
1261 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1262 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1263 }
1264 c.record_fields.push(leaf_field);
1267 if matches!(dim.dim_type, DimType::Bool) {
1268 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(TypeRef::BOOLEAN)));
1269 } else {
1270 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1271 }
1272 }
1273 DimensionNode::Group { graphql_name, description, children } => {
1274 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1275 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1276 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1277
1278 let mut child_record_fields: Vec<Field> = Vec::new();
1279 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1280 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1281
1282 let mut child_collector = DimCollector {
1283 cube_name: c.cube_name,
1284 compare_enum_name: c.compare_enum_name,
1285 filter_name: c.filter_name,
1286 record_fields: &mut child_record_fields,
1287 filter_fields: &mut child_filter_fields,
1288 extra_objects: c.extra_objects,
1289 extra_inputs: c.extra_inputs,
1290 extra_unions: c.extra_unions,
1291 };
1292 for child in children {
1293 collect_dimension_types(child, &new_prefix, &mut child_collector);
1294 }
1295
1296 let mut nested_record = Object::new(&nested_record_name);
1297 for f in child_record_fields { nested_record = nested_record.field(f); }
1298
1299 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1300 let mut nested_filter = InputObject::new(&nested_filter_name)
1301 .description(nested_filter_desc);
1302 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1303
1304 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1305 FieldFuture::new(async move {
1306 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1307 Ok(Some(FieldValue::owned_any(row.clone())))
1308 })
1309 });
1310 if let Some(desc) = description {
1311 group_field = group_field.description(desc);
1312 }
1313 c.record_fields.push(group_field);
1314 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1315 c.extra_objects.push(nested_record);
1316 c.extra_inputs.push(nested_filter);
1317 }
1318 DimensionNode::Array { graphql_name, description, children } => {
1319 let full_path = if prefix.is_empty() {
1320 graphql_name.clone()
1321 } else {
1322 format!("{prefix}_{graphql_name}")
1323 };
1324 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1325 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1326
1327 let mut element_obj = Object::new(&element_type_name);
1328 let mut includes_filter = InputObject::new(&includes_filter_name)
1329 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1330
1331 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1332
1333 for child in children {
1334 match &child.field_type {
1335 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1336 let col_name = child.column.clone();
1337 let is_datetime = *dt == DimType::DateTime;
1338 let mut field = Field::new(
1339 &child.graphql_name,
1340 dim_type_to_typeref(dt),
1341 move |ctx| {
1342 let col = col_name.clone();
1343 FieldFuture::new(async move {
1344 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1345 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1346 let gql_val = if is_datetime {
1347 json_to_gql_datetime(val)
1348 } else {
1349 json_to_gql_value(val)
1350 };
1351 Ok(Some(FieldValue::value(gql_val)))
1352 })
1353 },
1354 );
1355 if let Some(desc) = &child.description {
1356 field = field.description(desc);
1357 }
1358 element_obj = element_obj.field(field);
1359 includes_filter = includes_filter.field(
1360 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1361 );
1362 }
1363 crate::cube::definition::ArrayFieldType::Union(variants) => {
1364 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1365
1366 let mut gql_union = Union::new(&union_name);
1367 let mut variant_objects = Vec::new();
1368
1369 for v in variants {
1370 let variant_obj_name = v.type_name.clone();
1371 let field_name = v.field_name.clone();
1372 let source_type = v.source_type.clone();
1373 let type_ref = dim_type_to_typeref(&source_type);
1374
1375 let val_col = child.column.clone();
1376 let variant_obj = Object::new(&variant_obj_name)
1377 .field(Field::new(
1378 &field_name,
1379 type_ref,
1380 move |ctx| {
1381 let col = val_col.clone();
1382 FieldFuture::new(async move {
1383 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1384 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1385 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1386 })
1387 },
1388 ));
1389 gql_union = gql_union.possible_type(&variant_obj_name);
1390 variant_objects.push(variant_obj);
1391 }
1392
1393 let col_name = child.column.clone();
1394 let type_col = children.iter()
1395 .find(|f| f.graphql_name == "Type")
1396 .map(|f| f.column.clone())
1397 .unwrap_or_default();
1398 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1399
1400 let mut field = Field::new(
1401 &child.graphql_name,
1402 TypeRef::named(&union_name),
1403 move |ctx| {
1404 let col = col_name.clone();
1405 let tcol = type_col.clone();
1406 let vars = variants_clone.clone();
1407 FieldFuture::new(async move {
1408 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1409 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1410 if raw_val.is_null() || raw_val.as_str() == Some("") {
1411 return Ok(None);
1412 }
1413 let type_str = row.get(&tcol)
1414 .and_then(|v| v.as_str())
1415 .unwrap_or("");
1416 let resolved_type = resolve_union_typename(type_str, &vars);
1417 let mut elem = RowMap::new();
1418 elem.insert(col.clone(), raw_val);
1419 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1420 })
1421 },
1422 );
1423 if let Some(desc) = &child.description {
1424 field = field.description(desc);
1425 }
1426 element_obj = element_obj.field(field);
1427
1428 includes_filter = includes_filter.field(
1429 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1430 );
1431
1432 union_registrations.push((union_name, gql_union, variant_objects));
1433 }
1434 }
1435 }
1436
1437 for (_, union_type, variant_objs) in union_registrations {
1439 c.extra_objects.extend(variant_objs);
1440 c.extra_unions.push(union_type);
1448 }
1449
1450 let child_columns: Vec<(String, String)> = children.iter()
1452 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1453 .collect();
1454 let element_type_name_clone = element_type_name.clone();
1455
1456 let mut array_field = Field::new(
1457 graphql_name,
1458 TypeRef::named_nn_list_nn(&element_type_name),
1459 move |ctx| {
1460 let cols = child_columns.clone();
1461 let _etype = element_type_name_clone.clone();
1462 FieldFuture::new(async move {
1463 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1464 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1465 .map(|(gql_name, col)| {
1466 let arr = row.get(col)
1467 .and_then(|v| v.as_array())
1468 .cloned()
1469 .unwrap_or_default();
1470 (gql_name.as_str(), arr)
1471 })
1472 .collect();
1473
1474 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1475 let mut elements = Vec::with_capacity(len);
1476 for i in 0..len {
1477 let mut elem = RowMap::new();
1478 for (gql_name, arr) in &arrays {
1479 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1480 elem.insert(gql_name.to_string(), val);
1481 }
1482 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1484 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1485 elem.insert(col.clone(), val);
1486 }
1487 elements.push(FieldValue::owned_any(elem));
1488 }
1489 Ok(Some(FieldValue::list(elements)))
1490 })
1491 },
1492 );
1493 if let Some(desc) = description {
1494 array_field = array_field.description(desc);
1495 }
1496 c.record_fields.push(array_field);
1497
1498 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1502 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1503 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1504 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1505 .description("Match rows where at least one array element satisfies all conditions"));
1506 c.extra_inputs.push(wrapper_filter);
1507
1508 c.filter_fields.push(InputValue::new(
1509 graphql_name,
1510 TypeRef::named(&wrapper_filter_name),
1511 ));
1512
1513 c.extra_objects.push(element_obj);
1514 c.extra_inputs.push(includes_filter);
1515 }
1516 }
1517}
1518
1519fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1525 for v in variants {
1526 if v.source_type_names.iter().any(|s| s == type_str) {
1527 return v.type_name.clone();
1528 }
1529 }
1530 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1531}
1532
1533fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1534 match dt {
1535 DimType::String | DimType::Decimal => TypeRef::named(TypeRef::STRING),
1536 DimType::DateTime => TypeRef::named("DateTime"),
1537 DimType::Int => TypeRef::named(TypeRef::INT),
1538 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1539 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1540 }
1541}
1542
1543fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1544 match dt {
1545 DimType::String => "StringFilter",
1546 DimType::Int => "IntFilter",
1547 DimType::Float => "FloatFilter",
1548 DimType::Decimal => "DecimalFilter",
1549 DimType::DateTime => "DateTimeFilter",
1550 DimType::Bool => "BoolFilter",
1551 }
1552}
1553
1554pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1555 match v {
1556 serde_json::Value::Null => Value::Null,
1557 serde_json::Value::Bool(b) => Value::from(b),
1558 serde_json::Value::Number(n) => {
1559 if let Some(i) = n.as_i64() { Value::from(i) }
1560 else if let Some(f) = n.as_f64() { Value::from(f) }
1561 else { Value::from(n.to_string()) }
1562 }
1563 serde_json::Value::String(s) => Value::from(s),
1564 _ => Value::from(v.to_string()),
1565 }
1566}
1567
1568fn build_join_expr(
1571 jd: &crate::cube::definition::JoinDef,
1572 target_cube: &CubeDefinition,
1573 sub_field: &async_graphql::SelectionField<'_>,
1574 network: &str,
1575 join_idx: usize,
1576) -> JoinExpr {
1577 let target_flat = target_cube.flat_dimensions();
1578 let target_table = target_cube.table_for_chain(network);
1579
1580 let mut requested_paths = HashSet::new();
1581 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1582
1583 let mut selects: Vec<SelectExpr> = target_flat.iter()
1584 .filter(|(path, _)| requested_paths.contains(path))
1585 .map(|(_, dim)| SelectExpr::Column {
1586 column: dim.column.clone(),
1587 alias: None,
1588 })
1589 .collect();
1590
1591 if selects.is_empty() {
1592 selects = target_flat.iter()
1593 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1594 .collect();
1595 }
1596
1597 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1598
1599 let group_by = if is_aggregate {
1600 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1601 for sel in &selects {
1602 if let SelectExpr::Column { column, .. } = sel {
1603 if !column.contains('(') && !gb.contains(column) {
1604 gb.push(column.clone());
1605 }
1606 }
1607 }
1608 gb
1609 } else {
1610 vec![]
1611 };
1612
1613 JoinExpr {
1614 schema: target_cube.schema.clone(),
1615 table: target_table,
1616 alias: format!("_j{}", join_idx),
1617 conditions: jd.conditions.clone(),
1618 selects,
1619 group_by,
1620 use_final: target_cube.use_final,
1621 is_aggregate,
1622 target_cube: jd.target_cube.clone(),
1623 join_field: sub_field.name().to_string(),
1624 join_type: jd.join_type.clone(),
1625 }
1626}
1627
1628fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1631 match v {
1632 serde_json::Value::String(s) => {
1633 let iso = if s.contains('T') {
1634 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1635 } else {
1636 let replaced = s.replacen(' ', "T", 1);
1637 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1638 };
1639 Value::from(iso)
1640 }
1641 other => json_to_gql_value(other),
1642 }
1643}