1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr, is_aggregate_expr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21pub type QueryExecutor = Arc<
24 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26 > + Send + Sync,
27>;
28
29#[derive(Debug, Clone)]
33pub struct WrapperArg {
34 pub name: String,
36 pub type_ref: String,
38 pub description: String,
40 pub values: Option<Vec<String>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47 pub name: String,
49 pub group: ChainGroup,
51 pub networks: Vec<String>,
53 pub has_network_arg: bool,
56 pub extra_args: Vec<WrapperArg>,
59 pub network_enum_name: Option<String>,
63}
64
65pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70pub struct SchemaConfig {
72 pub chain_groups: Vec<ChainGroupConfig>,
73 pub root_query_name: String,
74 pub stats_callback: Option<StatsCallback>,
77 pub table_name_transform: Option<TableNameTransform>,
79 pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84 fn default() -> Self {
85 Self {
86 chain_groups: vec![
87 ChainGroupConfig {
88 name: "EVM".to_string(),
89 group: ChainGroup::Evm,
90 networks: vec!["eth".into(), "bsc".into()],
91 has_network_arg: true,
92 extra_args: vec![],
93 network_enum_name: None,
94 },
95 ChainGroupConfig {
96 name: "Solana".to_string(),
97 group: ChainGroup::Solana,
98 networks: vec!["sol".into()],
99 has_network_arg: false,
100 extra_args: vec![],
101 network_enum_name: None,
102 },
103 ChainGroupConfig {
104 name: "Trading".to_string(),
105 group: ChainGroup::Trading,
106 networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107 has_network_arg: false,
108 extra_args: vec![],
109 network_enum_name: None,
110 },
111 ],
112 root_query_name: "ChainStream".to_string(),
113 stats_callback: None,
114 table_name_transform: None,
115 extra_types: vec![],
116 }
117 }
118}
119
120pub struct ChainContext {
124 pub network: String,
125 pub extra: HashMap<String, String>,
126}
127
128pub fn build_schema(
140 registry: CubeRegistry,
141 dialect: Arc<dyn SqlDialect>,
142 executor: QueryExecutor,
143 config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145 let mut builder = Schema::build("Query", None, None);
146
147 builder = builder.register(filter_types::build_limit_input());
149 builder = builder.register(
151 Enum::new("OrderDirection")
152 .description("Sort direction")
153 .item(EnumItem::new("ASC").description("Ascending"))
154 .item(EnumItem::new("DESC").description("Descending")),
155 );
156 for input in filter_types::build_filter_primitives() {
157 builder = builder.register(input);
158 }
159 builder = builder.register(
160 Scalar::new("DateTime")
161 .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162 );
163 builder = builder.register(
164 Scalar::new("ISO8601DateTime")
165 .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166 );
167 builder = builder.register(
168 Enum::new("TimeUnit")
169 .description("Time unit for interval bucketing")
170 .item(EnumItem::new("seconds"))
171 .item(EnumItem::new("minutes"))
172 .item(EnumItem::new("hours"))
173 .item(EnumItem::new("days"))
174 .item(EnumItem::new("weeks"))
175 .item(EnumItem::new("months")),
176 );
177 builder = builder.register(
178 InputObject::new("TimeIntervalInput")
179 .description("Time bucketing interval for DateTime dimensions")
180 .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182 );
183 builder = builder.register(
184 InputObject::new("DimSelectWhere")
185 .description("Post-aggregation filter for dimension values (HAVING clause)")
186 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192 );
193
194 for extra_enum in config.extra_types {
195 builder = builder.register(extra_enum);
196 }
197
198 for grp in &config.chain_groups {
200 if grp.has_network_arg {
201 let enum_name = grp.network_enum_name.clone()
202 .unwrap_or_else(|| format!("{}Network", grp.name));
203 let mut net_enum = Enum::new(&enum_name)
204 .description(format!("{} network selector", grp.name));
205 for net in &grp.networks {
206 net_enum = net_enum.item(EnumItem::new(net));
207 }
208 builder = builder.register(net_enum);
209 }
210 }
211
212 let mut registered_cubes: HashSet<String> = HashSet::new();
214 for cube in registry.cubes() {
215 if registered_cubes.contains(&cube.name) { continue; }
216 registered_cubes.insert(cube.name.clone());
217
218 let types = build_cube_types(cube);
219 for obj in types.objects { builder = builder.register(obj); }
220 for inp in types.inputs { builder = builder.register(inp); }
221 for en in types.enums { builder = builder.register(en); }
222 for un in types.unions { builder = builder.register(un); }
223 }
224
225 let mut query = Object::new("Query");
227
228 for grp in &config.chain_groups {
229 let wrapper_type_name = grp.name.clone();
230
231 let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235 for cube in registry.cubes() {
236 if !cube.chain_groups.contains(&grp.group) { continue; }
237
238 let cube_field = build_cube_field(
239 cube,
240 dialect.clone(),
241 executor.clone(),
242 config.stats_callback.clone(),
243 );
244 wrapper_obj = wrapper_obj.field(cube_field);
245 }
246 builder = builder.register(wrapper_obj);
247
248 let default_network = grp.networks.first().cloned().unwrap_or_default();
250 let has_net_arg = grp.has_network_arg;
251 let net_enum_name = grp.network_enum_name.clone()
252 .unwrap_or_else(|| format!("{}Network", grp.name));
253 let wrapper_type_for_resolver = wrapper_type_name.clone();
254 let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256 let mut wrapper_field = Field::new(
257 &wrapper_type_name,
258 TypeRef::named_nn(&wrapper_type_name),
259 move |ctx| {
260 let default = default_network.clone();
261 let has_arg = has_net_arg;
262 let arg_names = extra_arg_names.clone();
263 FieldFuture::new(async move {
264 let network = if has_arg {
265 ctx.args.try_get("network")
266 .ok()
267 .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268 .unwrap_or(default)
269 } else {
270 default
271 };
272 let mut extra = HashMap::new();
273 for arg_name in &arg_names {
274 if let Ok(val) = ctx.args.try_get(arg_name) {
275 let resolved = val.enum_name().ok().map(|s| s.to_string())
276 .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277 .or_else(|| val.string().ok().map(|s| s.to_string()));
278 if let Some(v) = resolved {
279 extra.insert(arg_name.clone(), v);
280 }
281 }
282 }
283 Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284 })
285 },
286 );
287
288 if grp.has_network_arg {
289 let net_type_ref = if grp.networks.len() > 1 {
290 TypeRef::named_nn(&net_enum_name) } else {
292 TypeRef::named(&net_enum_name) };
294 wrapper_field = wrapper_field.argument(
295 InputValue::new("network", net_type_ref)
296 .description(format!("{} network to query", wrapper_type_for_resolver)),
297 );
298 }
299 for wa in &grp.extra_args {
300 wrapper_field = wrapper_field.argument(
301 InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
302 .description(&wa.description),
303 );
304 }
305
306 query = query.field(wrapper_field);
307 }
308
309 let metadata_registry = Arc::new(registry.clone());
311 let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
312 .map(|g| (g.name.clone(), g.group.clone()))
313 .collect();
314 let metadata_field = Field::new(
315 "_cubeMetadata",
316 TypeRef::named_nn(TypeRef::STRING),
317 move |_ctx| {
318 let reg = metadata_registry.clone();
319 let groups = metadata_groups.clone();
320 FieldFuture::new(async move {
321 let mut group_metadata: Vec<serde_json::Value> = Vec::new();
322 for (group_name, group_enum) in &groups {
323 let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
324 .filter(|c| c.chain_groups.contains(group_enum))
325 .map(serialize_cube_metadata)
326 .collect();
327 group_metadata.push(serde_json::json!({
328 "group": group_name,
329 "cubes": cubes_in_group,
330 }));
331 }
332 let json = serde_json::to_string(&group_metadata).unwrap_or_default();
333 Ok(Some(FieldValue::value(Value::from(json))))
334 })
335 },
336 )
337 .description("Internal: returns JSON metadata about all cubes grouped by chain");
338 query = query.field(metadata_field);
339
340 builder = builder.register(query);
341 builder = builder.data(registry);
342
343 if let Some(transform) = config.table_name_transform.clone() {
344 builder = builder.data(transform);
345 }
346
347 builder.finish()
348}
349
350fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
351 serde_json::json!({
352 "name": cube.name,
353 "description": cube.description,
354 "schema": cube.schema,
355 "tablePattern": cube.table_pattern,
356 "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
357 "metrics": cube.metrics.iter().map(|m| {
358 let mut obj = serde_json::json!({
359 "name": m.name,
360 "returnType": format!("{:?}", m.return_type),
361 });
362 if let Some(ref tmpl) = m.expression_template {
363 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
364 }
365 if let Some(ref desc) = m.description {
366 obj["description"] = serde_json::Value::String(desc.clone());
367 }
368 obj
369 }).collect::<Vec<_>>(),
370 "selectors": cube.selectors.iter().map(|s| {
371 serde_json::json!({
372 "name": s.graphql_name,
373 "column": s.column,
374 "type": format!("{:?}", s.dim_type),
375 })
376 }).collect::<Vec<_>>(),
377 "dimensions": serialize_dims(&cube.dimensions),
378 "joins": cube.joins.iter().map(|j| {
379 serde_json::json!({
380 "field": j.field_name,
381 "target": j.target_cube,
382 "joinType": format!("{:?}", j.join_type),
383 })
384 }).collect::<Vec<_>>(),
385 "tableRoutes": cube.table_routes.iter().map(|r| {
386 serde_json::json!({
387 "schema": r.schema,
388 "tablePattern": r.table_pattern,
389 "availableColumns": r.available_columns,
390 "priority": r.priority,
391 })
392 }).collect::<Vec<_>>(),
393 "defaultLimit": cube.default_limit,
394 "maxLimit": cube.max_limit,
395 })
396}
397
398fn build_cube_field(
400 cube: &CubeDefinition,
401 dialect: Arc<dyn SqlDialect>,
402 executor: QueryExecutor,
403 stats_cb: Option<StatsCallback>,
404) -> Field {
405 let cube_name = cube.name.clone();
406 let orderby_input_name = format!("{}OrderByInput", cube.name);
407 let cube_description = cube.description.clone();
408
409 let mut field = Field::new(
410 &cube.name,
411 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
412 move |ctx| {
413 let cube_name = cube_name.clone();
414 let dialect = dialect.clone();
415 let executor = executor.clone();
416 let stats_cb = stats_cb.clone();
417 FieldFuture::new(async move {
418 let registry = ctx.ctx.data::<CubeRegistry>()?;
419
420 let chain_ctx = ctx.parent_value
421 .try_downcast_ref::<ChainContext>().ok();
422 let network = chain_ctx
423 .map(|c| c.network.as_str())
424 .unwrap_or("sol");
425
426 let cube_def = registry.get(&cube_name).ok_or_else(|| {
427 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
428 })?;
429
430 let metric_requests = extract_metric_requests(&ctx, cube_def);
431 let quantile_requests = extract_quantile_requests(&ctx);
432 let calculate_requests = extract_calculate_requests(&ctx);
433 let field_aliases = extract_field_aliases(&ctx, cube_def);
434 let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
435 let time_intervals = extract_time_interval_requests(&ctx, cube_def);
436 let requested = extract_requested_fields(&ctx, cube_def);
437 let mut ir = compiler::parser::parse_cube_query(
438 cube_def,
439 network,
440 &ctx.args,
441 &metric_requests,
442 &quantile_requests,
443 &calculate_requests,
444 &field_aliases,
445 &dim_agg_requests,
446 &time_intervals,
447 Some(requested),
448 )?;
449
450 let mut join_idx = 0usize;
451 for sub_field in ctx.ctx.field().selection_set() {
452 let fname = sub_field.name().to_string();
453 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
454 if let Some(jd) = join_def {
455 if let Some(target_cube) = registry.get(&jd.target_cube) {
456 let join_expr = build_join_expr(
457 jd, target_cube, &sub_field, network, join_idx,
458 );
459 ir.joins.push(join_expr);
460 join_idx += 1;
461 }
462 }
463 }
464
465 if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
466 if let Some(cctx) = chain_ctx {
467 ir.table = transform(&ir.table, cctx);
468 }
469 }
470
471 let validated = compiler::validator::validate(ir)?;
472 let result = dialect.compile(&validated);
473 let sql = result.sql;
474 let bindings = result.bindings;
475
476 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
477 async_graphql::Error::new(format!("Query execution failed: {e}"))
478 })?;
479
480 let rows = if result.alias_remap.is_empty() {
481 rows
482 } else {
483 rows.into_iter().map(|mut row| {
484 for (alias, original) in &result.alias_remap {
485 if let Some(val) = row.shift_remove(alias) {
486 row.entry(original.clone()).or_insert(val);
487 }
488 }
489 row
490 }).collect()
491 };
492
493 let rows: Vec<RowMap> = if validated.joins.is_empty() {
494 rows
495 } else {
496 rows.into_iter().map(|mut row| {
497 for join in &validated.joins {
498 let prefix = format!("{}.", join.alias);
499 let mut sub_row = RowMap::new();
500 let keys: Vec<String> = row.keys()
501 .filter(|k| k.starts_with(&prefix))
502 .cloned()
503 .collect();
504 for key in keys {
505 if let Some(val) = row.shift_remove(&key) {
506 sub_row.insert(key[prefix.len()..].to_string(), val);
507 }
508 }
509 let obj: serde_json::Map<String, serde_json::Value> =
510 sub_row.into_iter().collect();
511 row.insert(
512 join.join_field.clone(),
513 serde_json::Value::Object(obj),
514 );
515 }
516 row
517 }).collect()
518 };
519
520 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
521 .or_else(|| stats_cb.clone());
522 if let Some(cb) = effective_cb {
523 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
524 cb(stats);
525 }
526
527 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
528 Ok(Some(FieldValue::list(values)))
529 })
530 },
531 );
532 if !cube_description.is_empty() {
533 field = field.description(&cube_description);
534 }
535 field = field
536 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
537 .description("Filter conditions"))
538 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
539 .description("Pagination control"))
540 .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
541 .description("Per-group row limit"))
542 .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
543 .description("Sort order (Bitquery-compatible)"));
544
545 for sel in &cube.selectors {
546 let filter_type = dim_type_to_filter_name(&sel.dim_type);
547 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
548 .description(format!("Shorthand filter for {}", sel.graphql_name)));
549 }
550
551 field
552}
553
554fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
555 serde_json::Value::Array(dims.iter().map(|d| match d {
556 DimensionNode::Leaf(dim) => {
557 let mut obj = serde_json::json!({
558 "name": dim.graphql_name,
559 "column": dim.column,
560 "type": format!("{:?}", dim.dim_type),
561 });
562 if let Some(desc) = &dim.description {
563 obj["description"] = serde_json::Value::String(desc.clone());
564 }
565 obj
566 },
567 DimensionNode::Group { graphql_name, description, children } => {
568 let mut obj = serde_json::json!({
569 "name": graphql_name,
570 "children": serialize_dims(children),
571 });
572 if let Some(desc) = description {
573 obj["description"] = serde_json::Value::String(desc.clone());
574 }
575 obj
576 },
577 DimensionNode::Array { graphql_name, description, children } => {
578 let fields: Vec<serde_json::Value> = children.iter().map(|f| {
579 let type_value = match &f.field_type {
580 crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
581 "kind": "scalar",
582 "scalarType": format!("{:?}", dt),
583 }),
584 crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
585 "kind": "union",
586 "variants": variants.iter().map(|v| serde_json::json!({
587 "typeName": v.type_name,
588 "fieldName": v.field_name,
589 "sourceType": format!("{:?}", v.source_type),
590 })).collect::<Vec<_>>(),
591 }),
592 };
593 let mut field_obj = serde_json::json!({
594 "name": f.graphql_name,
595 "column": f.column,
596 "type": type_value,
597 });
598 if let Some(desc) = &f.description {
599 field_obj["description"] = serde_json::Value::String(desc.clone());
600 }
601 field_obj
602 }).collect();
603 let mut obj = serde_json::json!({
604 "name": graphql_name,
605 "kind": "array",
606 "fields": fields,
607 });
608 if let Some(desc) = description {
609 obj["description"] = serde_json::Value::String(desc.clone());
610 }
611 obj
612 },
613 }).collect())
614}
615
616fn extract_metric_requests(
620 ctx: &async_graphql::dynamic::ResolverContext,
621 cube: &CubeDefinition,
622) -> Vec<compiler::parser::MetricRequest> {
623 let mut requests = Vec::new();
624
625 for sub_field in ctx.ctx.field().selection_set() {
626 let name = sub_field.name();
627 if !cube.has_metric(name) {
628 continue;
629 }
630
631 let args = match sub_field.arguments() {
632 Ok(args) => args,
633 Err(_) => continue,
634 };
635
636 let of_dimension = args
637 .iter()
638 .find(|(k, _)| k.as_str() == "of")
639 .or_else(|| args.iter().find(|(k, _)| k.as_str() == "distinct"))
640 .and_then(|(_, v)| match v {
641 async_graphql::Value::Enum(e) => Some(e.to_string()),
642 async_graphql::Value::String(s) => Some(s.clone()),
643 _ => None,
644 })
645 .unwrap_or_else(|| "*".to_string());
646
647 let select_where_value = args
648 .iter()
649 .find(|(k, _)| k.as_str() == "selectWhere")
650 .map(|(_, v)| v.clone());
651
652 let condition_filter = args
653 .iter()
654 .find(|(k, _)| k.as_str() == "if")
655 .and_then(|(_, v)| {
656 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
657 .and_then(|f| if f.is_empty() { None } else { Some(f) })
658 });
659
660 let gql_alias = sub_field.alias().unwrap_or(name).to_string();
661 requests.push(compiler::parser::MetricRequest {
662 function: name.to_string(),
663 alias: gql_alias,
664 of_dimension,
665 select_where_value,
666 condition_filter,
667 });
668 }
669
670 requests
671}
672
673fn extract_requested_fields(
674 ctx: &async_graphql::dynamic::ResolverContext,
675 cube: &CubeDefinition,
676) -> HashSet<String> {
677 let mut fields = HashSet::new();
678 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
679 fields
680}
681
682fn collect_selection_paths(
683 field: &async_graphql::SelectionField<'_>,
684 prefix: &str,
685 out: &mut HashSet<String>,
686 metrics: &[MetricDef],
687) {
688 for sub in field.selection_set() {
689 let name = sub.name();
690 if metrics.iter().any(|m| m.name == name) {
691 continue;
692 }
693 let path = if prefix.is_empty() {
694 name.to_string()
695 } else {
696 format!("{prefix}_{name}")
697 };
698 let has_children = sub.selection_set().next().is_some();
699 if has_children {
700 collect_selection_paths(&sub, &path, out, metrics);
701 } else {
702 out.insert(path);
703 }
704 }
705}
706
707pub type FieldAliasMap = Vec<(String, String)>;
710
711pub struct QuantileRequest {
713 pub alias: String,
714 pub of_dimension: String,
715 pub level: f64,
716}
717
718pub struct CalculateRequest {
720 pub alias: String,
721 pub expression: String,
722}
723
724pub struct TimeIntervalRequest {
727 pub field_path: String,
728 pub graphql_alias: String,
729 pub column: String,
730 pub unit: String,
731 pub count: i64,
732}
733
734pub struct DimAggRequest {
737 pub field_path: String,
738 pub graphql_alias: String,
739 pub value_column: String,
740 pub agg_type: DimAggType,
741 pub compare_column: String,
742 pub condition_filter: Option<FilterNode>,
743 pub select_where_value: Option<async_graphql::Value>,
744}
745
746fn extract_quantile_requests(
747 ctx: &async_graphql::dynamic::ResolverContext,
748) -> Vec<QuantileRequest> {
749 let mut requests = Vec::new();
750 for sub_field in ctx.ctx.field().selection_set() {
751 if sub_field.name() != "quantile" { continue; }
752 let args = match sub_field.arguments() {
753 Ok(a) => a,
754 Err(_) => continue,
755 };
756 let of_dim = args.iter()
757 .find(|(k, _)| k.as_str() == "of")
758 .and_then(|(_, v)| match v {
759 async_graphql::Value::Enum(e) => Some(e.to_string()),
760 async_graphql::Value::String(s) => Some(s.clone()),
761 _ => None,
762 })
763 .unwrap_or_else(|| "*".to_string());
764 let level = args.iter()
765 .find(|(k, _)| k.as_str() == "level")
766 .and_then(|(_, v)| match v {
767 async_graphql::Value::Number(n) => n.as_f64(),
768 _ => None,
769 })
770 .unwrap_or(0.5);
771 let alias = sub_field.alias().unwrap_or("quantile").to_string();
772 requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
773 }
774 requests
775}
776
777fn extract_field_aliases(
778 ctx: &async_graphql::dynamic::ResolverContext,
779 cube: &CubeDefinition,
780) -> FieldAliasMap {
781 let flat = cube.flat_dimensions();
782 let mut aliases = Vec::new();
783 collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
784 aliases
785}
786
787fn collect_field_aliases(
788 field: &async_graphql::SelectionField<'_>,
789 prefix: &str,
790 flat: &[(String, crate::cube::definition::Dimension)],
791 metrics: &[MetricDef],
792 out: &mut FieldAliasMap,
793) {
794 for sub in field.selection_set() {
795 let name = sub.name();
796 if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
797 continue;
798 }
799 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
800 let has_children = sub.selection_set().next().is_some();
801 if has_children {
802 collect_field_aliases(&sub, &path, flat, metrics, out);
803 } else if let Some(alias) = sub.alias() {
804 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
805 let alias_path = if prefix.is_empty() {
806 alias.to_string()
807 } else {
808 format!("{prefix}_{alias}")
809 };
810 out.push((alias_path, dim.column.clone()));
811 }
812 }
813 }
814}
815
816fn extract_calculate_requests(
817 ctx: &async_graphql::dynamic::ResolverContext,
818) -> Vec<CalculateRequest> {
819 let mut requests = Vec::new();
820 for sub_field in ctx.ctx.field().selection_set() {
821 if sub_field.name() != "calculate" { continue; }
822 let args = match sub_field.arguments() {
823 Ok(a) => a,
824 Err(_) => continue,
825 };
826 let expression = args.iter()
827 .find(|(k, _)| k.as_str() == "expression")
828 .and_then(|(_, v)| match v {
829 async_graphql::Value::String(s) => Some(s.clone()),
830 _ => None,
831 });
832 if let Some(expr) = expression {
833 let alias = sub_field.alias().unwrap_or("calculate").to_string();
834 requests.push(CalculateRequest { alias, expression: expr });
835 }
836 }
837 requests
838}
839
840fn extract_dim_agg_requests(
841 ctx: &async_graphql::dynamic::ResolverContext,
842 cube: &CubeDefinition,
843) -> Vec<DimAggRequest> {
844 let flat = cube.flat_dimensions();
845 let mut requests = Vec::new();
846 collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
847 requests
848}
849
850fn collect_dim_agg_paths(
851 field: &async_graphql::SelectionField<'_>,
852 prefix: &str,
853 flat: &[(String, crate::cube::definition::Dimension)],
854 metrics: &[MetricDef],
855 cube: &CubeDefinition,
856 out: &mut Vec<DimAggRequest>,
857) {
858 for sub in field.selection_set() {
859 let name = sub.name();
860 if metrics.iter().any(|m| m.name == name) {
861 continue;
862 }
863 let path = if prefix.is_empty() {
864 name.to_string()
865 } else {
866 format!("{prefix}_{name}")
867 };
868 let has_children = sub.selection_set().next().is_some();
869 if has_children {
870 collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
871 } else {
872 let args = match sub.arguments() {
873 Ok(a) => a,
874 Err(_) => continue,
875 };
876 let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
877 let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
878
879 let (agg_type, compare_path) = if let Some((_, v)) = max_val {
880 let cp = match v {
881 async_graphql::Value::Enum(e) => e.to_string(),
882 async_graphql::Value::String(s) => s.clone(),
883 _ => continue,
884 };
885 (DimAggType::ArgMax, cp)
886 } else if let Some((_, v)) = min_val {
887 let cp = match v {
888 async_graphql::Value::Enum(e) => e.to_string(),
889 async_graphql::Value::String(s) => s.clone(),
890 _ => continue,
891 };
892 (DimAggType::ArgMin, cp)
893 } else {
894 continue;
895 };
896
897 let value_column = flat.iter()
898 .find(|(p, _)| p == &path)
899 .map(|(_, dim)| dim.column.clone());
900 let compare_column = flat.iter()
901 .find(|(p, _)| p == &compare_path)
902 .map(|(_, dim)| dim.column.clone());
903
904 if let (Some(vc), Some(cc)) = (value_column, compare_column) {
905 let condition_filter = args.iter()
906 .find(|(k, _)| k.as_str() == "if")
907 .and_then(|(_, v)| {
908 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
909 .and_then(|f| if f.is_empty() { None } else { Some(f) })
910 });
911 let select_where_value = args.iter()
912 .find(|(k, _)| k.as_str() == "selectWhere")
913 .map(|(_, v)| v.clone());
914
915 let gql_alias = sub.alias()
916 .map(|a| a.to_string())
917 .unwrap_or_else(|| path.clone());
918 out.push(DimAggRequest {
919 field_path: path,
920 graphql_alias: gql_alias,
921 value_column: vc,
922 agg_type,
923 compare_column: cc,
924 condition_filter,
925 select_where_value,
926 });
927 }
928 }
929 }
930}
931
932fn extract_time_interval_requests(
933 ctx: &async_graphql::dynamic::ResolverContext,
934 cube: &CubeDefinition,
935) -> Vec<TimeIntervalRequest> {
936 let flat = cube.flat_dimensions();
937 let mut requests = Vec::new();
938 collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
939 requests
940}
941
942fn collect_time_interval_paths(
943 field: &async_graphql::SelectionField<'_>,
944 prefix: &str,
945 flat: &[(String, crate::cube::definition::Dimension)],
946 metrics: &[MetricDef],
947 out: &mut Vec<TimeIntervalRequest>,
948) {
949 for sub in field.selection_set() {
950 let name = sub.name();
951 if metrics.iter().any(|m| m.name == name) { continue; }
952 let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
953 let has_children = sub.selection_set().next().is_some();
954 if has_children {
955 collect_time_interval_paths(&sub, &path, flat, metrics, out);
956 } else {
957 let args = match sub.arguments() {
958 Ok(a) => a,
959 Err(_) => continue,
960 };
961 let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
962 if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
963 let unit = obj.get("in")
964 .and_then(|v| match v {
965 async_graphql::Value::Enum(e) => Some(e.to_string()),
966 async_graphql::Value::String(s) => Some(s.clone()),
967 _ => None,
968 });
969 let count = obj.get("count")
970 .and_then(|v| match v {
971 async_graphql::Value::Number(n) => n.as_i64(),
972 _ => None,
973 });
974 if let (Some(unit), Some(count)) = (unit, count) {
975 if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
976 let gql_alias = sub.alias().unwrap_or(name).to_string();
977 out.push(TimeIntervalRequest {
978 field_path: path,
979 graphql_alias: gql_alias,
980 column: dim.column.clone(),
981 unit,
982 count,
983 });
984 }
985 }
986 }
987 }
988 }
989}
990
991struct CubeTypes {
996 objects: Vec<Object>,
997 inputs: Vec<InputObject>,
998 enums: Vec<Enum>,
999 unions: Vec<Union>,
1000}
1001
1002fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
1003 let record_name = format!("{}Record", cube.name);
1004 let filter_name = format!("{}Filter", cube.name);
1005 let compare_enum_name = format!("{}CompareFields", cube.name);
1006
1007 let flat_dims = cube.flat_dimensions();
1008
1009 let mut compare_enum = Enum::new(&compare_enum_name)
1010 .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1011 for (path, _) in &flat_dims {
1012 compare_enum = compare_enum.item(EnumItem::new(path));
1013 }
1014
1015 let mut record_fields: Vec<Field> = Vec::new();
1016 let mut filter_fields: Vec<InputValue> = Vec::new();
1017 let mut extra_objects: Vec<Object> = Vec::new();
1018 let mut extra_inputs: Vec<InputObject> = Vec::new();
1019 let mut extra_unions: Vec<Union> = Vec::new();
1020
1021 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1022 .description("OR combinator — matches if any sub-filter matches"));
1023
1024 {
1025 let mut collector = DimCollector {
1026 cube_name: &cube.name,
1027 compare_enum_name: &compare_enum_name,
1028 filter_name: &filter_name,
1029 record_fields: &mut record_fields,
1030 filter_fields: &mut filter_fields,
1031 extra_objects: &mut extra_objects,
1032 extra_inputs: &mut extra_inputs,
1033 extra_unions: &mut extra_unions,
1034 };
1035 for node in &cube.dimensions {
1036 collect_dimension_types(node, "", &mut collector);
1037 }
1038 }
1039
1040 let mut metric_enums: Vec<Enum> = Vec::new();
1041 let builtin_descs: std::collections::HashMap<&str, &str> = [
1042 ("count", "Count of rows or distinct values"),
1043 ("sum", "Sum of values"),
1044 ("avg", "Average of values"),
1045 ("min", "Minimum value"),
1046 ("max", "Maximum value"),
1047 ("uniq", "Count of unique (distinct) values"),
1048 ].into_iter().collect();
1049
1050 for metric in &cube.metrics {
1051 let metric_name = &metric.name;
1052 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1053
1054 if metric.supports_where {
1055 extra_inputs.push(
1056 InputObject::new(&select_where_name)
1057 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1058 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1059 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1060 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1061 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1062 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
1063 .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
1064 );
1065 }
1066
1067 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1068 let mut of_enum = Enum::new(&of_enum_name)
1069 .description(format!("Dimension to apply {} aggregation on", metric_name));
1070 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1071 metric_enums.push(of_enum);
1072
1073 let metric_name_clone = metric_name.clone();
1074 let return_type_ref = dim_type_to_typeref(&metric.return_type);
1075 let metric_desc = metric.description.as_deref()
1076 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1077 .unwrap_or("Aggregate metric");
1078
1079 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1080 let default_name = metric_name_clone.clone();
1081 FieldFuture::new(async move {
1082 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1083 let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1084 let key = metric_key(alias);
1085 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1086 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1087 })
1088 })
1089 .description(metric_desc)
1090 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1091 .description("Dimension to aggregate on (default: all rows)"));
1092
1093 if metric_name == "count" {
1094 metric_field = metric_field
1095 .argument(InputValue::new("distinct", TypeRef::named(&of_enum_name))
1096 .description("Count distinct values of this dimension (alias for of, maps to uniqExact)"));
1097 }
1098
1099 if metric.supports_where {
1100 metric_field = metric_field
1101 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1102 .description("Post-aggregation filter (HAVING)"))
1103 .argument(InputValue::new("if", TypeRef::named(&filter_name))
1104 .description("Conditional filter for this metric"));
1105 }
1106
1107 record_fields.push(metric_field);
1108 }
1109
1110 {
1112 let of_enum_name = format!("{}_quantile_Of", cube.name);
1113 let mut of_enum = Enum::new(&of_enum_name)
1114 .description(format!("Dimension to apply quantile on for {}", cube.name));
1115 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1116 metric_enums.push(of_enum);
1117
1118 let of_enum_for_closure = of_enum_name.clone();
1119 let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1120 FieldFuture::new(async move {
1121 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1122 let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1123 let key = metric_key(alias);
1124 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1125 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1126 })
1127 })
1128 .description("Compute a quantile (percentile) of a dimension")
1129 .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1130 .description("Dimension to compute quantile on"))
1131 .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1132 .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1133 record_fields.push(quantile_field);
1134 }
1135
1136 {
1138 let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1139 FieldFuture::new(async move {
1140 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1141 let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1142 let key = metric_key(alias);
1143 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1144 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1145 })
1146 })
1147 .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1148 .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1149 .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1150 record_fields.push(calculate_field);
1151 }
1152
1153 for jd in &cube.joins {
1155 let target_record_name = format!("{}Record", jd.target_cube);
1156 let field_name_owned = jd.field_name.clone();
1157 let mut join_field = Field::new(
1158 &jd.field_name,
1159 TypeRef::named(&target_record_name),
1160 move |ctx| {
1161 let field_name = field_name_owned.clone();
1162 FieldFuture::new(async move {
1163 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1164 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1165 let sub_row: RowMap = obj.iter()
1166 .map(|(k, v)| (k.clone(), v.clone()))
1167 .collect();
1168 Ok(Some(FieldValue::owned_any(sub_row)))
1169 } else {
1170 Ok(Some(FieldValue::value(Value::Null)))
1171 }
1172 })
1173 },
1174 );
1175 if let Some(desc) = &jd.description {
1176 join_field = join_field.description(desc);
1177 }
1178 record_fields.push(join_field);
1179 }
1180
1181 let mut record = Object::new(&record_name);
1182 for f in record_fields { record = record.field(f); }
1183
1184 let mut filter = InputObject::new(&filter_name)
1185 .description(format!("Filter conditions for {} query", cube.name));
1186 for f in filter_fields { filter = filter.field(f); }
1187
1188 let orderby_input_name = format!("{}OrderByInput", cube.name);
1189 let orderby_input = InputObject::new(&orderby_input_name)
1190 .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1191 .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1192 .description("Sort descending by this field"))
1193 .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1194 .description("Sort ascending by this field"))
1195 .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1196 .description("Sort descending by computed/aggregated field name"))
1197 .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1198 .description("Sort ascending by computed/aggregated field name"));
1199
1200 let limitby_input_name = format!("{}LimitByInput", cube.name);
1201 let limitby_input = InputObject::new(&limitby_input_name)
1202 .description(format!("Per-group row limit for {}", cube.name))
1203 .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1204 .description("Dimension field to group by"))
1205 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1206 .description("Maximum rows per group"))
1207 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1208 .description("Rows to skip per group"));
1209
1210 let mut objects = vec![record]; objects.extend(extra_objects);
1211 let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1212 let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1213
1214 CubeTypes { objects, inputs, enums, unions: extra_unions }
1215}
1216
1217struct DimCollector<'a> {
1218 cube_name: &'a str,
1219 compare_enum_name: &'a str,
1220 filter_name: &'a str,
1221 record_fields: &'a mut Vec<Field>,
1222 filter_fields: &'a mut Vec<InputValue>,
1223 extra_objects: &'a mut Vec<Object>,
1224 extra_inputs: &'a mut Vec<InputObject>,
1225 extra_unions: &'a mut Vec<Union>,
1226}
1227
1228fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1229 match node {
1230 DimensionNode::Leaf(dim) => {
1231 let col = dim.column.clone();
1232 let is_datetime = dim.dim_type == DimType::DateTime;
1233 let compare_enum = c.compare_enum_name.to_string();
1234 let cube_filter = c.filter_name.to_string();
1235 let mut leaf_field = Field::new(
1236 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1237 move |ctx| {
1238 let col = col.clone();
1239 FieldFuture::new(async move {
1240 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1241 let has_interval = ctx.args.try_get("interval").is_ok();
1242 let has_max = ctx.args.try_get("maximum").is_ok();
1243 let has_min = ctx.args.try_get("minimum").is_ok();
1244 let key = if has_interval || has_max || has_min {
1245 let name = ctx.ctx.field().alias().unwrap_or(&col);
1246 dim_agg_key(name)
1247 } else {
1248 col.clone()
1249 };
1250 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1251 let gql_val = if is_datetime {
1252 json_to_gql_datetime(val)
1253 } else {
1254 json_to_gql_value(val)
1255 };
1256 Ok(Some(FieldValue::value(gql_val)))
1257 })
1258 },
1259 );
1260 if let Some(desc) = &dim.description {
1261 leaf_field = leaf_field.description(desc);
1262 }
1263 leaf_field = leaf_field
1264 .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1265 .description("Return value from row where compare field is maximum (argMax)"))
1266 .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1267 .description("Return value from row where compare field is minimum (argMin)"))
1268 .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1269 .description("Conditional filter for aggregation"))
1270 .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1271 .description("Post-aggregation value filter (HAVING)"));
1272 if is_datetime {
1273 leaf_field = leaf_field
1274 .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1275 .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1276 }
1277 c.record_fields.push(leaf_field);
1280 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1281 }
1282 DimensionNode::Group { graphql_name, description, children } => {
1283 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1284 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1285 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1286
1287 let mut child_record_fields: Vec<Field> = Vec::new();
1288 let mut child_filter_fields: Vec<InputValue> = Vec::new();
1289 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1290
1291 let mut child_collector = DimCollector {
1292 cube_name: c.cube_name,
1293 compare_enum_name: c.compare_enum_name,
1294 filter_name: c.filter_name,
1295 record_fields: &mut child_record_fields,
1296 filter_fields: &mut child_filter_fields,
1297 extra_objects: c.extra_objects,
1298 extra_inputs: c.extra_inputs,
1299 extra_unions: c.extra_unions,
1300 };
1301 for child in children {
1302 collect_dimension_types(child, &new_prefix, &mut child_collector);
1303 }
1304
1305 let mut nested_record = Object::new(&nested_record_name);
1306 for f in child_record_fields { nested_record = nested_record.field(f); }
1307
1308 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1309 let mut nested_filter = InputObject::new(&nested_filter_name)
1310 .description(nested_filter_desc);
1311 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1312
1313 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1314 FieldFuture::new(async move {
1315 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1316 Ok(Some(FieldValue::owned_any(row.clone())))
1317 })
1318 });
1319 if let Some(desc) = description {
1320 group_field = group_field.description(desc);
1321 }
1322 c.record_fields.push(group_field);
1323 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1324 c.extra_objects.push(nested_record);
1325 c.extra_inputs.push(nested_filter);
1326 }
1327 DimensionNode::Array { graphql_name, description, children } => {
1328 let full_path = if prefix.is_empty() {
1329 graphql_name.clone()
1330 } else {
1331 format!("{prefix}_{graphql_name}")
1332 };
1333 let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1334 let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1335
1336 let mut element_obj = Object::new(&element_type_name);
1337 let mut includes_filter = InputObject::new(&includes_filter_name)
1338 .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1339
1340 let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1341
1342 for child in children {
1343 match &child.field_type {
1344 crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1345 let col_name = child.column.clone();
1346 let is_datetime = *dt == DimType::DateTime;
1347 let mut field = Field::new(
1348 &child.graphql_name,
1349 dim_type_to_typeref(dt),
1350 move |ctx| {
1351 let col = col_name.clone();
1352 FieldFuture::new(async move {
1353 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1354 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1355 let gql_val = if is_datetime {
1356 json_to_gql_datetime(val)
1357 } else {
1358 json_to_gql_value(val)
1359 };
1360 Ok(Some(FieldValue::value(gql_val)))
1361 })
1362 },
1363 );
1364 if let Some(desc) = &child.description {
1365 field = field.description(desc);
1366 }
1367 element_obj = element_obj.field(field);
1368 includes_filter = includes_filter.field(
1369 InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1370 );
1371 }
1372 crate::cube::definition::ArrayFieldType::Union(variants) => {
1373 let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1374
1375 let mut gql_union = Union::new(&union_name);
1376 let mut variant_objects = Vec::new();
1377
1378 for v in variants {
1379 let variant_obj_name = v.type_name.clone();
1380 let field_name = v.field_name.clone();
1381 let source_type = v.source_type.clone();
1382 let type_ref = dim_type_to_typeref(&source_type);
1383
1384 let val_col = child.column.clone();
1385 let variant_obj = Object::new(&variant_obj_name)
1386 .field(Field::new(
1387 &field_name,
1388 type_ref,
1389 move |ctx| {
1390 let col = val_col.clone();
1391 FieldFuture::new(async move {
1392 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1393 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1394 Ok(Some(FieldValue::value(json_to_gql_value(val))))
1395 })
1396 },
1397 ));
1398 gql_union = gql_union.possible_type(&variant_obj_name);
1399 variant_objects.push(variant_obj);
1400 }
1401
1402 let col_name = child.column.clone();
1403 let type_col = children.iter()
1404 .find(|f| f.graphql_name == "Type")
1405 .map(|f| f.column.clone())
1406 .unwrap_or_default();
1407 let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1408
1409 let mut field = Field::new(
1410 &child.graphql_name,
1411 TypeRef::named(&union_name),
1412 move |ctx| {
1413 let col = col_name.clone();
1414 let tcol = type_col.clone();
1415 let vars = variants_clone.clone();
1416 FieldFuture::new(async move {
1417 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1418 let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1419 if raw_val.is_null() || raw_val.as_str() == Some("") {
1420 return Ok(None);
1421 }
1422 let type_str = row.get(&tcol)
1423 .and_then(|v| v.as_str())
1424 .unwrap_or("");
1425 let resolved_type = resolve_union_typename(type_str, &vars);
1426 let mut elem = RowMap::new();
1427 elem.insert(col.clone(), raw_val);
1428 Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1429 })
1430 },
1431 );
1432 if let Some(desc) = &child.description {
1433 field = field.description(desc);
1434 }
1435 element_obj = element_obj.field(field);
1436
1437 includes_filter = includes_filter.field(
1438 InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1439 );
1440
1441 union_registrations.push((union_name, gql_union, variant_objects));
1442 }
1443 }
1444 }
1445
1446 for (_, union_type, variant_objs) in union_registrations {
1448 c.extra_objects.extend(variant_objs);
1449 c.extra_unions.push(union_type);
1457 }
1458
1459 let child_columns: Vec<(String, String)> = children.iter()
1461 .map(|f| (f.graphql_name.clone(), f.column.clone()))
1462 .collect();
1463 let element_type_name_clone = element_type_name.clone();
1464
1465 let mut array_field = Field::new(
1466 graphql_name,
1467 TypeRef::named_nn_list_nn(&element_type_name),
1468 move |ctx| {
1469 let cols = child_columns.clone();
1470 let _etype = element_type_name_clone.clone();
1471 FieldFuture::new(async move {
1472 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1473 let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1474 .map(|(gql_name, col)| {
1475 let arr = row.get(col)
1476 .and_then(|v| v.as_array())
1477 .cloned()
1478 .unwrap_or_default();
1479 (gql_name.as_str(), arr)
1480 })
1481 .collect();
1482
1483 let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1484 let mut elements = Vec::with_capacity(len);
1485 for i in 0..len {
1486 let mut elem = RowMap::new();
1487 for (gql_name, arr) in &arrays {
1488 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1489 elem.insert(gql_name.to_string(), val);
1490 }
1491 for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1493 let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1494 elem.insert(col.clone(), val);
1495 }
1496 elements.push(FieldValue::owned_any(elem));
1497 }
1498 Ok(Some(FieldValue::list(elements)))
1499 })
1500 },
1501 );
1502 if let Some(desc) = description {
1503 array_field = array_field.description(desc);
1504 }
1505 c.record_fields.push(array_field);
1506
1507 let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1511 let wrapper_filter = InputObject::new(&wrapper_filter_name)
1512 .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1513 .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1514 .description("Match rows where at least one array element satisfies all conditions"));
1515 c.extra_inputs.push(wrapper_filter);
1516
1517 c.filter_fields.push(InputValue::new(
1518 graphql_name,
1519 TypeRef::named(&wrapper_filter_name),
1520 ));
1521
1522 c.extra_objects.push(element_obj);
1523 c.extra_inputs.push(includes_filter);
1524 }
1525 }
1526}
1527
1528fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1534 for v in variants {
1535 if v.source_type_names.iter().any(|s| s == type_str) {
1536 return v.type_name.clone();
1537 }
1538 }
1539 variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1540}
1541
1542fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1543 match dt {
1544 DimType::String | DimType::Decimal | DimType::BigInteger | DimType::Date => TypeRef::named(TypeRef::STRING),
1545 DimType::DateTime => TypeRef::named("DateTime"),
1546 DimType::Int => TypeRef::named(TypeRef::INT),
1547 DimType::Float => TypeRef::named(TypeRef::FLOAT),
1548 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1549 }
1550}
1551
1552fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1553 match dt {
1554 DimType::String => "StringFilter",
1555 DimType::Int => "IntFilter",
1556 DimType::Float => "FloatFilter",
1557 DimType::Decimal => "DecimalFilter",
1558 DimType::BigInteger => "BigIntFilter",
1559 DimType::Date => "DateFilter",
1560 DimType::DateTime => "DateTimeFilter",
1561 DimType::Bool => "BoolFilter",
1562 }
1563}
1564
1565pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1566 match v {
1567 serde_json::Value::Null => Value::Null,
1568 serde_json::Value::Bool(b) => Value::from(b),
1569 serde_json::Value::Number(n) => {
1570 if let Some(i) = n.as_i64() { Value::from(i) }
1571 else if let Some(f) = n.as_f64() { Value::from(f) }
1572 else { Value::from(n.to_string()) }
1573 }
1574 serde_json::Value::String(s) => Value::from(s),
1575 _ => Value::from(v.to_string()),
1576 }
1577}
1578
1579fn build_join_expr(
1582 jd: &crate::cube::definition::JoinDef,
1583 target_cube: &CubeDefinition,
1584 sub_field: &async_graphql::SelectionField<'_>,
1585 network: &str,
1586 join_idx: usize,
1587) -> JoinExpr {
1588 let target_flat = target_cube.flat_dimensions();
1589 let target_table = target_cube.table_for_chain(network);
1590
1591 let mut requested_paths = HashSet::new();
1592 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1593
1594 let mut selects: Vec<SelectExpr> = target_flat.iter()
1595 .filter(|(path, _)| requested_paths.contains(path))
1596 .map(|(_, dim)| SelectExpr::Column {
1597 column: dim.column.clone(),
1598 alias: None,
1599 })
1600 .collect();
1601
1602 if selects.is_empty() {
1603 selects = target_flat.iter()
1604 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1605 .collect();
1606 }
1607
1608 let is_aggregate = target_flat.iter().any(|(_, dim)| is_aggregate_expr(&dim.column));
1609
1610 let group_by = if is_aggregate {
1611 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1612 for sel in &selects {
1613 if let SelectExpr::Column { column, .. } = sel {
1614 if !is_aggregate_expr(column) && !gb.contains(column) {
1615 gb.push(column.clone());
1616 }
1617 }
1618 }
1619 gb
1620 } else {
1621 vec![]
1622 };
1623
1624 JoinExpr {
1625 schema: target_cube.schema.clone(),
1626 table: target_table,
1627 alias: format!("_j{}", join_idx),
1628 conditions: jd.conditions.clone(),
1629 selects,
1630 group_by,
1631 use_final: target_cube.use_final,
1632 is_aggregate,
1633 target_cube: jd.target_cube.clone(),
1634 join_field: sub_field.name().to_string(),
1635 join_type: jd.join_type.clone(),
1636 }
1637}
1638
1639fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1642 match v {
1643 serde_json::Value::String(s) => {
1644 let iso = if s.contains('T') {
1645 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1646 } else {
1647 let replaced = s.replacen(' ', "T", 1);
1648 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1649 };
1650 Value::from(iso)
1651 }
1652 other => json_to_gql_value(other),
1653 }
1654}