Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
30#[derive(Debug, Clone)]
31pub struct ChainGroupConfig {
32    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
33    pub name: String,
34    /// Which ChainGroup enum value this config represents.
35    pub group: ChainGroup,
36    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
37    pub networks: Vec<String>,
38    /// If true the wrapper type takes a `network` argument (EVM style).
39    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
40    pub has_network_arg: bool,
41}
42
43/// Configuration for supported networks (chains) and optional stats collection.
44pub struct SchemaConfig {
45    pub chain_groups: Vec<ChainGroupConfig>,
46    pub root_query_name: String,
47    /// Optional callback invoked after each cube query with execution metadata.
48    /// Used by application layer for billing, observability, etc.
49    pub stats_callback: Option<StatsCallback>,
50}
51
52impl Default for SchemaConfig {
53    fn default() -> Self {
54        Self {
55            chain_groups: vec![
56                ChainGroupConfig {
57                    name: "EVM".to_string(),
58                    group: ChainGroup::Evm,
59                    networks: vec!["eth".into(), "bsc".into()],
60                    has_network_arg: true,
61                },
62                ChainGroupConfig {
63                    name: "Solana".to_string(),
64                    group: ChainGroup::Solana,
65                    networks: vec!["sol".into()],
66                    has_network_arg: false,
67                },
68                ChainGroupConfig {
69                    name: "Trading".to_string(),
70                    group: ChainGroup::Trading,
71                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
72                    has_network_arg: false,
73                },
74            ],
75            root_query_name: "ChainStream".to_string(),
76            stats_callback: None,
77        }
78    }
79}
80
81/// Stored in the parent value of chain wrapper types so cube resolvers can
82/// retrieve the active chain without a `network` argument on themselves.
83struct ChainContext {
84    network: String,
85}
86
87/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
88///
89/// Schema shape (Bitquery-aligned):
90/// ```graphql
91/// type Query {
92///   EVM(network: EVMNetwork!): EVM!
93///   Solana: Solana!
94///   Trading: Trading!
95///   _cubeMetadata: String!
96/// }
97/// ```
98pub fn build_schema(
99    registry: CubeRegistry,
100    dialect: Arc<dyn SqlDialect>,
101    executor: QueryExecutor,
102    config: SchemaConfig,
103) -> Result<Schema, SchemaError> {
104    let mut builder = Schema::build("Query", None, None);
105
106    // --- shared input / enum types -------------------------------------------
107    builder = builder.register(filter_types::build_limit_input());
108    builder = builder.register(
109        InputObject::new("LimitByInput")
110            .description("Limit results per group (similar to ClickHouse LIMIT BY)")
111            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
112                .description("Comma-separated dimension names to group by"))
113            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
114                .description("Maximum rows per group"))
115            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
116                .description("Rows to skip per group")),
117    );
118    builder = builder.register(
119        Enum::new("OrderDirection")
120            .description("Sort direction")
121            .item(EnumItem::new("ASC").description("Ascending"))
122            .item(EnumItem::new("DESC").description("Descending")),
123    );
124    for input in filter_types::build_filter_primitives() {
125        builder = builder.register(input);
126    }
127    builder = builder.register(
128        Enum::new("TimeUnit")
129            .description("Time unit for interval bucketing")
130            .item(EnumItem::new("seconds"))
131            .item(EnumItem::new("minutes"))
132            .item(EnumItem::new("hours"))
133            .item(EnumItem::new("days"))
134            .item(EnumItem::new("weeks"))
135            .item(EnumItem::new("months")),
136    );
137    builder = builder.register(
138        InputObject::new("TimeIntervalInput")
139            .description("Time bucketing interval for DateTime dimensions")
140            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
141            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
142    );
143    builder = builder.register(
144        InputObject::new("DimSelectWhere")
145            .description("Post-aggregation filter for dimension values (HAVING clause)")
146            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
147            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
148            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
149            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
150            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
151            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
152    );
153
154    // --- per-group network enums ---------------------------------------------
155    for grp in &config.chain_groups {
156        if grp.has_network_arg {
157            let enum_name = format!("{}Network", grp.name);
158            let mut net_enum = Enum::new(&enum_name)
159                .description(format!("{} network selector", grp.name));
160            for net in &grp.networks {
161                net_enum = net_enum.item(EnumItem::new(net));
162            }
163            builder = builder.register(net_enum);
164        }
165    }
166
167    // --- register cube types once (shared across chain groups) ----------------
168    let mut registered_cubes: HashSet<String> = HashSet::new();
169    for cube in registry.cubes() {
170        if registered_cubes.contains(&cube.name) { continue; }
171        registered_cubes.insert(cube.name.clone());
172
173        let types = build_cube_types(cube);
174        for obj in types.objects { builder = builder.register(obj); }
175        for inp in types.inputs { builder = builder.register(inp); }
176        for en in types.enums { builder = builder.register(en); }
177        for un in types.unions { builder = builder.register(un); }
178    }
179
180    // --- build chain wrapper types + Query fields ----------------------------
181    let mut query = Object::new("Query");
182
183    for grp in &config.chain_groups {
184        let wrapper_type_name = grp.name.clone();
185
186        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
187        // cube fields that belong to this group.
188        let mut wrapper_obj = Object::new(&wrapper_type_name);
189
190        for cube in registry.cubes() {
191            if !cube.chain_groups.contains(&grp.group) { continue; }
192
193            let cube_field = build_cube_field(
194                cube,
195                dialect.clone(),
196                executor.clone(),
197                config.stats_callback.clone(),
198            );
199            wrapper_obj = wrapper_obj.field(cube_field);
200        }
201        builder = builder.register(wrapper_obj);
202
203        // Wrapper resolver on Query — stores ChainContext for child resolvers.
204        let default_network = grp.networks.first().cloned().unwrap_or_default();
205        let has_net_arg = grp.has_network_arg;
206        let net_enum_name = format!("{}Network", grp.name);
207        let wrapper_type_for_resolver = wrapper_type_name.clone();
208
209        let mut wrapper_field = Field::new(
210            &wrapper_type_name,
211            TypeRef::named_nn(&wrapper_type_name),
212            move |ctx| {
213                let default = default_network.clone();
214                let has_arg = has_net_arg;
215                FieldFuture::new(async move {
216                    let network = if has_arg {
217                        ctx.args.try_get("network")
218                            .ok()
219                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
220                            .unwrap_or(default)
221                    } else {
222                        default
223                    };
224                    Ok(Some(FieldValue::owned_any(ChainContext { network })))
225                })
226            },
227        );
228
229        if grp.has_network_arg {
230            wrapper_field = wrapper_field.argument(
231                InputValue::new("network", TypeRef::named_nn(&net_enum_name))
232                    .description(format!("{} network to query", wrapper_type_for_resolver)),
233            );
234        }
235
236        query = query.field(wrapper_field);
237    }
238
239    // --- _cubeMetadata -------------------------------------------------------
240    let metadata_registry = Arc::new(registry.clone());
241    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
242        .map(|g| (g.name.clone(), g.group.clone()))
243        .collect();
244    let metadata_field = Field::new(
245        "_cubeMetadata",
246        TypeRef::named_nn(TypeRef::STRING),
247        move |_ctx| {
248            let reg = metadata_registry.clone();
249            let groups = metadata_groups.clone();
250            FieldFuture::new(async move {
251                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
252                for (group_name, group_enum) in &groups {
253                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
254                        .filter(|c| c.chain_groups.contains(group_enum))
255                        .map(serialize_cube_metadata)
256                        .collect();
257                    group_metadata.push(serde_json::json!({
258                        "group": group_name,
259                        "cubes": cubes_in_group,
260                    }));
261                }
262                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
263                Ok(Some(FieldValue::value(Value::from(json))))
264            })
265        },
266    )
267    .description("Internal: returns JSON metadata about all cubes grouped by chain");
268    query = query.field(metadata_field);
269
270    builder = builder.register(query);
271    builder = builder.data(registry);
272
273    builder.finish()
274}
275
276fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
277    serde_json::json!({
278        "name": cube.name,
279        "description": cube.description,
280        "schema": cube.schema,
281        "tablePattern": cube.table_pattern,
282        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
283        "metrics": cube.metrics.iter().map(|m| {
284            let mut obj = serde_json::json!({
285                "name": m.name,
286                "returnType": format!("{:?}", m.return_type),
287            });
288            if let Some(ref tmpl) = m.expression_template {
289                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
290            }
291            if let Some(ref desc) = m.description {
292                obj["description"] = serde_json::Value::String(desc.clone());
293            }
294            obj
295        }).collect::<Vec<_>>(),
296        "selectors": cube.selectors.iter().map(|s| {
297            serde_json::json!({
298                "name": s.graphql_name,
299                "column": s.column,
300                "type": format!("{:?}", s.dim_type),
301            })
302        }).collect::<Vec<_>>(),
303        "dimensions": serialize_dims(&cube.dimensions),
304        "joins": cube.joins.iter().map(|j| {
305            serde_json::json!({
306                "field": j.field_name,
307                "target": j.target_cube,
308                "joinType": format!("{:?}", j.join_type),
309            })
310        }).collect::<Vec<_>>(),
311        "tableRoutes": cube.table_routes.iter().map(|r| {
312            serde_json::json!({
313                "schema": r.schema,
314                "tablePattern": r.table_pattern,
315                "availableColumns": r.available_columns,
316                "priority": r.priority,
317            })
318        }).collect::<Vec<_>>(),
319        "defaultLimit": cube.default_limit,
320        "maxLimit": cube.max_limit,
321    })
322}
323
324/// Build a single cube Field that reads `network` from the parent ChainContext.
325fn build_cube_field(
326    cube: &CubeDefinition,
327    dialect: Arc<dyn SqlDialect>,
328    executor: QueryExecutor,
329    stats_cb: Option<StatsCallback>,
330) -> Field {
331    let cube_name = cube.name.clone();
332    let orderby_input_name = format!("{}OrderByInput", cube.name);
333    let cube_description = cube.description.clone();
334
335    let mut field = Field::new(
336        &cube.name,
337        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
338        move |ctx| {
339            let cube_name = cube_name.clone();
340            let dialect = dialect.clone();
341            let executor = executor.clone();
342            let stats_cb = stats_cb.clone();
343            FieldFuture::new(async move {
344                let registry = ctx.ctx.data::<CubeRegistry>()?;
345
346                let network = ctx.parent_value
347                    .try_downcast_ref::<ChainContext>()
348                    .map(|c| c.network.as_str())
349                    .unwrap_or("sol");
350
351                let cube_def = registry.get(&cube_name).ok_or_else(|| {
352                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
353                })?;
354
355                let metric_requests = extract_metric_requests(&ctx, cube_def);
356                let quantile_requests = extract_quantile_requests(&ctx);
357                let calculate_requests = extract_calculate_requests(&ctx);
358                let field_aliases = extract_field_aliases(&ctx, cube_def);
359                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
360                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
361                let requested = extract_requested_fields(&ctx, cube_def);
362                let mut ir = compiler::parser::parse_cube_query(
363                    cube_def,
364                    network,
365                    &ctx.args,
366                    &metric_requests,
367                    &quantile_requests,
368                    &calculate_requests,
369                    &field_aliases,
370                    &dim_agg_requests,
371                    &time_intervals,
372                    Some(requested),
373                )?;
374
375                let mut join_idx = 0usize;
376                for sub_field in ctx.ctx.field().selection_set() {
377                    let fname = sub_field.name().to_string();
378                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
379                    if let Some(jd) = join_def {
380                        if let Some(target_cube) = registry.get(&jd.target_cube) {
381                            let join_expr = build_join_expr(
382                                jd, target_cube, &sub_field, network, join_idx,
383                            );
384                            ir.joins.push(join_expr);
385                            join_idx += 1;
386                        }
387                    }
388                }
389
390                let validated = compiler::validator::validate(ir)?;
391                let result = dialect.compile(&validated);
392                let sql = result.sql;
393                let bindings = result.bindings;
394
395                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
396                    async_graphql::Error::new(format!("Query execution failed: {e}"))
397                })?;
398
399                let rows = if result.alias_remap.is_empty() {
400                    rows
401                } else {
402                    rows.into_iter().map(|mut row| {
403                        for (alias, original) in &result.alias_remap {
404                            if let Some(val) = row.shift_remove(alias) {
405                                row.entry(original.clone()).or_insert(val);
406                            }
407                        }
408                        row
409                    }).collect()
410                };
411
412                let rows: Vec<RowMap> = if validated.joins.is_empty() {
413                    rows
414                } else {
415                    rows.into_iter().map(|mut row| {
416                        for join in &validated.joins {
417                            let prefix = format!("{}.", join.alias);
418                            let mut sub_row = RowMap::new();
419                            let keys: Vec<String> = row.keys()
420                                .filter(|k| k.starts_with(&prefix))
421                                .cloned()
422                                .collect();
423                            for key in keys {
424                                if let Some(val) = row.shift_remove(&key) {
425                                    sub_row.insert(key[prefix.len()..].to_string(), val);
426                                }
427                            }
428                            let obj: serde_json::Map<String, serde_json::Value> =
429                                sub_row.into_iter().collect();
430                            row.insert(
431                                join.join_field.clone(),
432                                serde_json::Value::Object(obj),
433                            );
434                        }
435                        row
436                    }).collect()
437                };
438
439                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
440                    .or_else(|| stats_cb.clone());
441                if let Some(cb) = effective_cb {
442                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
443                    cb(stats);
444                }
445
446                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
447                Ok(Some(FieldValue::list(values)))
448            })
449        },
450    );
451    if !cube_description.is_empty() {
452        field = field.description(&cube_description);
453    }
454    field = field
455        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
456            .description("Filter conditions"))
457        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
458            .description("Pagination control"))
459        .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
460            .description("Per-group row limit"))
461        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
462            .description("Sort order (Bitquery-compatible)"));
463
464    for sel in &cube.selectors {
465        let filter_type = dim_type_to_filter_name(&sel.dim_type);
466        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
467            .description(format!("Shorthand filter for {}", sel.graphql_name)));
468    }
469
470    field
471}
472
473fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
474    serde_json::Value::Array(dims.iter().map(|d| match d {
475        DimensionNode::Leaf(dim) => {
476            let mut obj = serde_json::json!({
477                "name": dim.graphql_name,
478                "column": dim.column,
479                "type": format!("{:?}", dim.dim_type),
480            });
481            if let Some(desc) = &dim.description {
482                obj["description"] = serde_json::Value::String(desc.clone());
483            }
484            obj
485        },
486        DimensionNode::Group { graphql_name, description, children } => {
487            let mut obj = serde_json::json!({
488                "name": graphql_name,
489                "children": serialize_dims(children),
490            });
491            if let Some(desc) = description {
492                obj["description"] = serde_json::Value::String(desc.clone());
493            }
494            obj
495        },
496        DimensionNode::Array { graphql_name, description, children } => {
497            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
498                serde_json::json!({
499                    "name": f.graphql_name,
500                    "column": f.column,
501                    "type": format!("{:?}", f.field_type),
502                })
503            }).collect();
504            let mut obj = serde_json::json!({
505                "name": graphql_name,
506                "kind": "array",
507                "fields": fields,
508            });
509            if let Some(desc) = description {
510                obj["description"] = serde_json::Value::String(desc.clone());
511            }
512            obj
513        },
514    }).collect())
515}
516
517/// Extract metric requests from the GraphQL selection set by inspecting
518/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
519/// the "count" field in the selection set and extract its `of` argument.
520fn extract_metric_requests(
521    ctx: &async_graphql::dynamic::ResolverContext,
522    cube: &CubeDefinition,
523) -> Vec<compiler::parser::MetricRequest> {
524    let mut requests = Vec::new();
525
526    for sub_field in ctx.ctx.field().selection_set() {
527        let name = sub_field.name();
528        if !cube.has_metric(name) {
529            continue;
530        }
531
532        let args = match sub_field.arguments() {
533            Ok(args) => args,
534            Err(_) => continue,
535        };
536
537        let of_dimension = args
538            .iter()
539            .find(|(k, _)| k.as_str() == "of")
540            .and_then(|(_, v)| match v {
541                async_graphql::Value::Enum(e) => Some(e.to_string()),
542                async_graphql::Value::String(s) => Some(s.clone()),
543                _ => None,
544            })
545            .unwrap_or_else(|| "*".to_string());
546
547        let select_where_value = args
548            .iter()
549            .find(|(k, _)| k.as_str() == "selectWhere")
550            .map(|(_, v)| v.clone());
551
552        let condition_filter = args
553            .iter()
554            .find(|(k, _)| k.as_str() == "if")
555            .and_then(|(_, v)| {
556                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
557                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
558            });
559
560        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
561        requests.push(compiler::parser::MetricRequest {
562            function: name.to_string(),
563            alias: gql_alias,
564            of_dimension,
565            select_where_value,
566            condition_filter,
567        });
568    }
569
570    requests
571}
572
573fn extract_requested_fields(
574    ctx: &async_graphql::dynamic::ResolverContext,
575    cube: &CubeDefinition,
576) -> HashSet<String> {
577    let mut fields = HashSet::new();
578    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
579    fields
580}
581
582fn collect_selection_paths(
583    field: &async_graphql::SelectionField<'_>,
584    prefix: &str,
585    out: &mut HashSet<String>,
586    metrics: &[MetricDef],
587) {
588    for sub in field.selection_set() {
589        let name = sub.name();
590        if metrics.iter().any(|m| m.name == name) {
591            continue;
592        }
593        let path = if prefix.is_empty() {
594            name.to_string()
595        } else {
596            format!("{prefix}_{name}")
597        };
598        let has_children = sub.selection_set().next().is_some();
599        if has_children {
600            collect_selection_paths(&sub, &path, out, metrics);
601        } else {
602            out.insert(path);
603        }
604    }
605}
606
607/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
608/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
609pub type FieldAliasMap = Vec<(String, String)>;
610
611/// Describes a `quantile(of: ..., level: ...)` request.
612pub struct QuantileRequest {
613    pub alias: String,
614    pub of_dimension: String,
615    pub level: f64,
616}
617
618/// Describes a `calculate(expression: "...")` request.
619pub struct CalculateRequest {
620    pub alias: String,
621    pub expression: String,
622}
623
624/// Describes a time interval bucketing request.
625/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
626pub struct TimeIntervalRequest {
627    pub field_path: String,
628    pub graphql_alias: String,
629    pub column: String,
630    pub unit: String,
631    pub count: i64,
632}
633
634/// Describes a dimension-level aggregation request extracted from the selection set.
635/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
636pub struct DimAggRequest {
637    pub field_path: String,
638    pub graphql_alias: String,
639    pub value_column: String,
640    pub agg_type: DimAggType,
641    pub compare_column: String,
642    pub condition_filter: Option<FilterNode>,
643    pub select_where_value: Option<async_graphql::Value>,
644}
645
646fn extract_quantile_requests(
647    ctx: &async_graphql::dynamic::ResolverContext,
648) -> Vec<QuantileRequest> {
649    let mut requests = Vec::new();
650    for sub_field in ctx.ctx.field().selection_set() {
651        if sub_field.name() != "quantile" { continue; }
652        let args = match sub_field.arguments() {
653            Ok(a) => a,
654            Err(_) => continue,
655        };
656        let of_dim = args.iter()
657            .find(|(k, _)| k.as_str() == "of")
658            .and_then(|(_, v)| match v {
659                async_graphql::Value::Enum(e) => Some(e.to_string()),
660                async_graphql::Value::String(s) => Some(s.clone()),
661                _ => None,
662            })
663            .unwrap_or_else(|| "*".to_string());
664        let level = args.iter()
665            .find(|(k, _)| k.as_str() == "level")
666            .and_then(|(_, v)| match v {
667                async_graphql::Value::Number(n) => n.as_f64(),
668                _ => None,
669            })
670            .unwrap_or(0.5);
671        let alias = sub_field.alias().unwrap_or("quantile").to_string();
672        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
673    }
674    requests
675}
676
677fn extract_field_aliases(
678    ctx: &async_graphql::dynamic::ResolverContext,
679    cube: &CubeDefinition,
680) -> FieldAliasMap {
681    let flat = cube.flat_dimensions();
682    let mut aliases = Vec::new();
683    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
684    aliases
685}
686
687fn collect_field_aliases(
688    field: &async_graphql::SelectionField<'_>,
689    prefix: &str,
690    flat: &[(String, crate::cube::definition::Dimension)],
691    metrics: &[MetricDef],
692    out: &mut FieldAliasMap,
693) {
694    for sub in field.selection_set() {
695        let name = sub.name();
696        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
697            continue;
698        }
699        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
700        let has_children = sub.selection_set().next().is_some();
701        if has_children {
702            collect_field_aliases(&sub, &path, flat, metrics, out);
703        } else if let Some(alias) = sub.alias() {
704            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
705                let alias_path = if prefix.is_empty() {
706                    alias.to_string()
707                } else {
708                    format!("{prefix}_{alias}")
709                };
710                out.push((alias_path, dim.column.clone()));
711            }
712        }
713    }
714}
715
716fn extract_calculate_requests(
717    ctx: &async_graphql::dynamic::ResolverContext,
718) -> Vec<CalculateRequest> {
719    let mut requests = Vec::new();
720    for sub_field in ctx.ctx.field().selection_set() {
721        if sub_field.name() != "calculate" { continue; }
722        let args = match sub_field.arguments() {
723            Ok(a) => a,
724            Err(_) => continue,
725        };
726        let expression = args.iter()
727            .find(|(k, _)| k.as_str() == "expression")
728            .and_then(|(_, v)| match v {
729                async_graphql::Value::String(s) => Some(s.clone()),
730                _ => None,
731            });
732        if let Some(expr) = expression {
733            let alias = sub_field.alias().unwrap_or("calculate").to_string();
734            requests.push(CalculateRequest { alias, expression: expr });
735        }
736    }
737    requests
738}
739
740fn extract_dim_agg_requests(
741    ctx: &async_graphql::dynamic::ResolverContext,
742    cube: &CubeDefinition,
743) -> Vec<DimAggRequest> {
744    let flat = cube.flat_dimensions();
745    let mut requests = Vec::new();
746    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
747    requests
748}
749
750fn collect_dim_agg_paths(
751    field: &async_graphql::SelectionField<'_>,
752    prefix: &str,
753    flat: &[(String, crate::cube::definition::Dimension)],
754    metrics: &[MetricDef],
755    cube: &CubeDefinition,
756    out: &mut Vec<DimAggRequest>,
757) {
758    for sub in field.selection_set() {
759        let name = sub.name();
760        if metrics.iter().any(|m| m.name == name) {
761            continue;
762        }
763        let path = if prefix.is_empty() {
764            name.to_string()
765        } else {
766            format!("{prefix}_{name}")
767        };
768        let has_children = sub.selection_set().next().is_some();
769        if has_children {
770            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
771        } else {
772            let args = match sub.arguments() {
773                Ok(a) => a,
774                Err(_) => continue,
775            };
776            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
777            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
778
779            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
780                let cp = match v {
781                    async_graphql::Value::Enum(e) => e.to_string(),
782                    async_graphql::Value::String(s) => s.clone(),
783                    _ => continue,
784                };
785                (DimAggType::ArgMax, cp)
786            } else if let Some((_, v)) = min_val {
787                let cp = match v {
788                    async_graphql::Value::Enum(e) => e.to_string(),
789                    async_graphql::Value::String(s) => s.clone(),
790                    _ => continue,
791                };
792                (DimAggType::ArgMin, cp)
793            } else {
794                continue;
795            };
796
797            let value_column = flat.iter()
798                .find(|(p, _)| p == &path)
799                .map(|(_, dim)| dim.column.clone());
800            let compare_column = flat.iter()
801                .find(|(p, _)| p == &compare_path)
802                .map(|(_, dim)| dim.column.clone());
803
804            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
805                let condition_filter = args.iter()
806                    .find(|(k, _)| k.as_str() == "if")
807                    .and_then(|(_, v)| {
808                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
809                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
810                    });
811                let select_where_value = args.iter()
812                    .find(|(k, _)| k.as_str() == "selectWhere")
813                    .map(|(_, v)| v.clone());
814
815                let gql_alias = sub.alias()
816                    .map(|a| a.to_string())
817                    .unwrap_or_else(|| path.clone());
818                out.push(DimAggRequest {
819                    field_path: path,
820                    graphql_alias: gql_alias,
821                    value_column: vc,
822                    agg_type,
823                    compare_column: cc,
824                    condition_filter,
825                    select_where_value,
826                });
827            }
828        }
829    }
830}
831
832fn extract_time_interval_requests(
833    ctx: &async_graphql::dynamic::ResolverContext,
834    cube: &CubeDefinition,
835) -> Vec<TimeIntervalRequest> {
836    let flat = cube.flat_dimensions();
837    let mut requests = Vec::new();
838    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
839    requests
840}
841
842fn collect_time_interval_paths(
843    field: &async_graphql::SelectionField<'_>,
844    prefix: &str,
845    flat: &[(String, crate::cube::definition::Dimension)],
846    metrics: &[MetricDef],
847    out: &mut Vec<TimeIntervalRequest>,
848) {
849    for sub in field.selection_set() {
850        let name = sub.name();
851        if metrics.iter().any(|m| m.name == name) { continue; }
852        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
853        let has_children = sub.selection_set().next().is_some();
854        if has_children {
855            collect_time_interval_paths(&sub, &path, flat, metrics, out);
856        } else {
857            let args = match sub.arguments() {
858                Ok(a) => a,
859                Err(_) => continue,
860            };
861            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
862            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
863                let unit = obj.get("in")
864                    .and_then(|v| match v {
865                        async_graphql::Value::Enum(e) => Some(e.to_string()),
866                        async_graphql::Value::String(s) => Some(s.clone()),
867                        _ => None,
868                    });
869                let count = obj.get("count")
870                    .and_then(|v| match v {
871                        async_graphql::Value::Number(n) => n.as_i64(),
872                        _ => None,
873                    });
874                if let (Some(unit), Some(count)) = (unit, count) {
875                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
876                        let gql_alias = sub.alias().unwrap_or(name).to_string();
877                        out.push(TimeIntervalRequest {
878                            field_path: path,
879                            graphql_alias: gql_alias,
880                            column: dim.column.clone(),
881                            unit,
882                            count,
883                        });
884                    }
885                }
886            }
887        }
888    }
889}
890
891// ---------------------------------------------------------------------------
892// Per-Cube GraphQL type generation
893// ---------------------------------------------------------------------------
894
895struct CubeTypes {
896    objects: Vec<Object>,
897    inputs: Vec<InputObject>,
898    enums: Vec<Enum>,
899    unions: Vec<Union>,
900}
901
902fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
903    let record_name = format!("{}Record", cube.name);
904    let filter_name = format!("{}Filter", cube.name);
905    let compare_enum_name = format!("{}CompareFields", cube.name);
906
907    let flat_dims = cube.flat_dimensions();
908
909    let mut compare_enum = Enum::new(&compare_enum_name)
910        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
911    for (path, _) in &flat_dims {
912        compare_enum = compare_enum.item(EnumItem::new(path));
913    }
914
915    let mut record_fields: Vec<Field> = Vec::new();
916    let mut filter_fields: Vec<InputValue> = Vec::new();
917    let mut extra_objects: Vec<Object> = Vec::new();
918    let mut extra_inputs: Vec<InputObject> = Vec::new();
919    let mut extra_unions: Vec<Union> = Vec::new();
920
921    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
922        .description("OR combinator — matches if any sub-filter matches"));
923
924    {
925        let mut collector = DimCollector {
926            cube_name: &cube.name,
927            compare_enum_name: &compare_enum_name,
928            filter_name: &filter_name,
929            record_fields: &mut record_fields,
930            filter_fields: &mut filter_fields,
931            extra_objects: &mut extra_objects,
932            extra_inputs: &mut extra_inputs,
933            extra_unions: &mut extra_unions,
934        };
935        for node in &cube.dimensions {
936            collect_dimension_types(node, "", &mut collector);
937        }
938    }
939
940    let mut metric_enums: Vec<Enum> = Vec::new();
941    let builtin_descs: std::collections::HashMap<&str, &str> = [
942        ("count", "Count of rows or distinct values"),
943        ("sum", "Sum of values"),
944        ("avg", "Average of values"),
945        ("min", "Minimum value"),
946        ("max", "Maximum value"),
947        ("uniq", "Count of unique (distinct) values"),
948    ].into_iter().collect();
949
950    for metric in &cube.metrics {
951        let metric_name = &metric.name;
952        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
953
954        if metric.supports_where {
955            extra_inputs.push(
956                InputObject::new(&select_where_name)
957                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
958                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
959                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
960                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
961                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
962                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
963            );
964        }
965
966        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
967        let mut of_enum = Enum::new(&of_enum_name)
968            .description(format!("Dimension to apply {} aggregation on", metric_name));
969        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
970        metric_enums.push(of_enum);
971
972        let metric_name_clone = metric_name.clone();
973        let return_type_ref = dim_type_to_typeref(&metric.return_type);
974        let metric_desc = metric.description.as_deref()
975            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
976            .unwrap_or("Aggregate metric");
977
978        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
979            let default_name = metric_name_clone.clone();
980            FieldFuture::new(async move {
981                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
982                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
983                let key = metric_key(alias);
984                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
985                Ok(Some(FieldValue::value(json_to_gql_value(val))))
986            })
987        })
988        .description(metric_desc)
989        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
990            .description("Dimension to aggregate on (default: all rows)"));
991
992        if metric.supports_where {
993            metric_field = metric_field
994                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
995                    .description("Post-aggregation filter (HAVING)"))
996                .argument(InputValue::new("if", TypeRef::named(&filter_name))
997                    .description("Conditional filter for this metric"));
998        }
999
1000        record_fields.push(metric_field);
1001    }
1002
1003    // `quantile(of: ..., level: ...)` — percentile computation
1004    {
1005        let of_enum_name = format!("{}_quantile_Of", cube.name);
1006        let mut of_enum = Enum::new(&of_enum_name)
1007            .description(format!("Dimension to apply quantile on for {}", cube.name));
1008        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1009        metric_enums.push(of_enum);
1010
1011        let of_enum_for_closure = of_enum_name.clone();
1012        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1013            FieldFuture::new(async move {
1014                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1015                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1016                let key = metric_key(alias);
1017                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1018                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1019            })
1020        })
1021        .description("Compute a quantile (percentile) of a dimension")
1022        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1023            .description("Dimension to compute quantile on"))
1024        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1025            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1026        record_fields.push(quantile_field);
1027    }
1028
1029    // `calculate(expression: "...")` — runtime expression computation
1030    {
1031        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1032            FieldFuture::new(async move {
1033                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1034                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1035                let key = metric_key(alias);
1036                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1037                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1038            })
1039        })
1040        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1041        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1042            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1043        record_fields.push(calculate_field);
1044    }
1045
1046    // Add join fields: joinXxx returns the target cube's Record type
1047    for jd in &cube.joins {
1048        let target_record_name = format!("{}Record", jd.target_cube);
1049        let field_name_owned = jd.field_name.clone();
1050        let mut join_field = Field::new(
1051            &jd.field_name,
1052            TypeRef::named(&target_record_name),
1053            move |ctx| {
1054                let field_name = field_name_owned.clone();
1055                FieldFuture::new(async move {
1056                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1057                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1058                        let sub_row: RowMap = obj.iter()
1059                            .map(|(k, v)| (k.clone(), v.clone()))
1060                            .collect();
1061                        Ok(Some(FieldValue::owned_any(sub_row)))
1062                    } else {
1063                        Ok(Some(FieldValue::value(Value::Null)))
1064                    }
1065                })
1066            },
1067        );
1068        if let Some(desc) = &jd.description {
1069            join_field = join_field.description(desc);
1070        }
1071        record_fields.push(join_field);
1072    }
1073
1074    let mut record = Object::new(&record_name);
1075    for f in record_fields { record = record.field(f); }
1076
1077    let mut filter = InputObject::new(&filter_name)
1078        .description(format!("Filter conditions for {} query", cube.name));
1079    for f in filter_fields { filter = filter.field(f); }
1080
1081    let orderby_input_name = format!("{}OrderByInput", cube.name);
1082    let orderby_input = InputObject::new(&orderby_input_name)
1083        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1084        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1085            .description("Sort descending by this field"))
1086        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1087            .description("Sort ascending by this field"))
1088        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1089            .description("Sort descending by computed/aggregated field name"))
1090        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1091            .description("Sort ascending by computed/aggregated field name"));
1092
1093    let mut objects = vec![record]; objects.extend(extra_objects);
1094    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1095    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1096
1097    CubeTypes { objects, inputs, enums, unions: extra_unions }
1098}
1099
1100struct DimCollector<'a> {
1101    cube_name: &'a str,
1102    compare_enum_name: &'a str,
1103    filter_name: &'a str,
1104    record_fields: &'a mut Vec<Field>,
1105    filter_fields: &'a mut Vec<InputValue>,
1106    extra_objects: &'a mut Vec<Object>,
1107    extra_inputs: &'a mut Vec<InputObject>,
1108    extra_unions: &'a mut Vec<Union>,
1109}
1110
1111fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1112    match node {
1113        DimensionNode::Leaf(dim) => {
1114            let col = dim.column.clone();
1115            let is_datetime = dim.dim_type == DimType::DateTime;
1116            let compare_enum = c.compare_enum_name.to_string();
1117            let cube_filter = c.filter_name.to_string();
1118            let mut leaf_field = Field::new(
1119                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1120                move |ctx| {
1121                    let col = col.clone();
1122                    FieldFuture::new(async move {
1123                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1124                        let has_interval = ctx.args.try_get("interval").is_ok();
1125                        let has_max = ctx.args.try_get("maximum").is_ok();
1126                        let has_min = ctx.args.try_get("minimum").is_ok();
1127                        let key = if has_interval || has_max || has_min {
1128                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1129                            dim_agg_key(name)
1130                        } else {
1131                            col.clone()
1132                        };
1133                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1134                        let gql_val = if is_datetime {
1135                            json_to_gql_datetime(val)
1136                        } else {
1137                            json_to_gql_value(val)
1138                        };
1139                        Ok(Some(FieldValue::value(gql_val)))
1140                    })
1141                },
1142            );
1143            if let Some(desc) = &dim.description {
1144                leaf_field = leaf_field.description(desc);
1145            }
1146            leaf_field = leaf_field
1147                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1148                    .description("Return value from row where compare field is maximum (argMax)"))
1149                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1150                    .description("Return value from row where compare field is minimum (argMin)"))
1151                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1152                    .description("Conditional filter for aggregation"))
1153                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1154                    .description("Post-aggregation value filter (HAVING)"));
1155            if is_datetime {
1156                leaf_field = leaf_field
1157                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1158                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1159            }
1160            // Update resolver key: if interval is present, read from the interval expression key
1161            // This is handled by the resolver checking ctx.args for "interval"
1162            c.record_fields.push(leaf_field);
1163            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1164        }
1165        DimensionNode::Group { graphql_name, description, children } => {
1166            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1167            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1168            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1169
1170            let mut child_record_fields: Vec<Field> = Vec::new();
1171            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1172            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1173
1174            let mut child_collector = DimCollector {
1175                cube_name: c.cube_name,
1176                compare_enum_name: c.compare_enum_name,
1177                filter_name: c.filter_name,
1178                record_fields: &mut child_record_fields,
1179                filter_fields: &mut child_filter_fields,
1180                extra_objects: c.extra_objects,
1181                extra_inputs: c.extra_inputs,
1182                extra_unions: c.extra_unions,
1183            };
1184            for child in children {
1185                collect_dimension_types(child, &new_prefix, &mut child_collector);
1186            }
1187
1188            let mut nested_record = Object::new(&nested_record_name);
1189            for f in child_record_fields { nested_record = nested_record.field(f); }
1190
1191            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1192            let mut nested_filter = InputObject::new(&nested_filter_name)
1193                .description(nested_filter_desc);
1194            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1195
1196            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1197                FieldFuture::new(async move {
1198                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1199                    Ok(Some(FieldValue::owned_any(row.clone())))
1200                })
1201            });
1202            if let Some(desc) = description {
1203                group_field = group_field.description(desc);
1204            }
1205            c.record_fields.push(group_field);
1206            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1207            c.extra_objects.push(nested_record);
1208            c.extra_inputs.push(nested_filter);
1209        }
1210        DimensionNode::Array { graphql_name, description, children } => {
1211            let full_path = if prefix.is_empty() {
1212                graphql_name.clone()
1213            } else {
1214                format!("{prefix}_{graphql_name}")
1215            };
1216            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1217            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1218
1219            let mut element_obj = Object::new(&element_type_name);
1220            let mut includes_filter = InputObject::new(&includes_filter_name)
1221                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1222
1223            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1224
1225            for child in children {
1226                match &child.field_type {
1227                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1228                        let col_name = child.column.clone();
1229                        let is_datetime = *dt == DimType::DateTime;
1230                        let mut field = Field::new(
1231                            &child.graphql_name,
1232                            dim_type_to_typeref(dt),
1233                            move |ctx| {
1234                                let col = col_name.clone();
1235                                FieldFuture::new(async move {
1236                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1237                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1238                                    let gql_val = if is_datetime {
1239                                        json_to_gql_datetime(val)
1240                                    } else {
1241                                        json_to_gql_value(val)
1242                                    };
1243                                    Ok(Some(FieldValue::value(gql_val)))
1244                                })
1245                            },
1246                        );
1247                        if let Some(desc) = &child.description {
1248                            field = field.description(desc);
1249                        }
1250                        element_obj = element_obj.field(field);
1251                        includes_filter = includes_filter.field(
1252                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1253                        );
1254                    }
1255                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1256                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1257
1258                        let mut gql_union = Union::new(&union_name);
1259                        let mut variant_objects = Vec::new();
1260
1261                        for v in variants {
1262                            let variant_obj_name = v.type_name.clone();
1263                            let field_name = v.field_name.clone();
1264                            let source_type = v.source_type.clone();
1265                            let type_ref = dim_type_to_typeref(&source_type);
1266
1267                            let val_col = child.column.clone();
1268                            let variant_obj = Object::new(&variant_obj_name)
1269                                .field(Field::new(
1270                                    &field_name,
1271                                    type_ref,
1272                                    move |ctx| {
1273                                        let col = val_col.clone();
1274                                        FieldFuture::new(async move {
1275                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1276                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1277                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1278                                        })
1279                                    },
1280                                ));
1281                            gql_union = gql_union.possible_type(&variant_obj_name);
1282                            variant_objects.push(variant_obj);
1283                        }
1284
1285                        let col_name = child.column.clone();
1286                        let type_col = children.iter()
1287                            .find(|f| f.graphql_name == "Type")
1288                            .map(|f| f.column.clone())
1289                            .unwrap_or_default();
1290                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1291
1292                        let mut field = Field::new(
1293                            &child.graphql_name,
1294                            TypeRef::named(&union_name),
1295                            move |ctx| {
1296                                let col = col_name.clone();
1297                                let tcol = type_col.clone();
1298                                let vars = variants_clone.clone();
1299                                FieldFuture::new(async move {
1300                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1301                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1302                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1303                                        return Ok(None);
1304                                    }
1305                                    let type_str = row.get(&tcol)
1306                                        .and_then(|v| v.as_str())
1307                                        .unwrap_or("");
1308                                    let resolved_type = resolve_union_typename(type_str, &vars);
1309                                    let mut elem = RowMap::new();
1310                                    elem.insert(col.clone(), raw_val);
1311                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1312                                })
1313                            },
1314                        );
1315                        if let Some(desc) = &child.description {
1316                            field = field.description(desc);
1317                        }
1318                        element_obj = element_obj.field(field);
1319
1320                        includes_filter = includes_filter.field(
1321                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1322                        );
1323
1324                        union_registrations.push((union_name, gql_union, variant_objects));
1325                    }
1326                }
1327            }
1328
1329            // Register union types and variant objects
1330            for (_, union_type, variant_objs) in union_registrations {
1331                c.extra_objects.extend(variant_objs);
1332                // Unions need to be registered differently — store in extra_objects
1333                // by wrapping in a dummy. Actually, we need to register unions via builder.
1334                // For now, we'll store them in a new field. Let's use the enums vec hack:
1335                // Actually, async-graphql dynamic schema registers unions the same way.
1336                // We need to pass them up. Let's add a unions vec to DimCollector.
1337                // For now, store union as Object (it won't work). We need to refactor.
1338                // Let me add unions to CubeTypes.
1339                c.extra_unions.push(union_type);
1340            }
1341
1342            // Build the list resolver: zip parallel array columns into element objects
1343            let child_columns: Vec<(String, String)> = children.iter()
1344                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1345                .collect();
1346            let element_type_name_clone = element_type_name.clone();
1347
1348            let mut array_field = Field::new(
1349                graphql_name,
1350                TypeRef::named_nn_list_nn(&element_type_name),
1351                move |ctx| {
1352                    let cols = child_columns.clone();
1353                    let _etype = element_type_name_clone.clone();
1354                    FieldFuture::new(async move {
1355                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1356                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1357                            .map(|(gql_name, col)| {
1358                                let arr = row.get(col)
1359                                    .and_then(|v| v.as_array())
1360                                    .cloned()
1361                                    .unwrap_or_default();
1362                                (gql_name.as_str(), arr)
1363                            })
1364                            .collect();
1365
1366                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1367                        let mut elements = Vec::with_capacity(len);
1368                        for i in 0..len {
1369                            let mut elem = RowMap::new();
1370                            for (gql_name, arr) in &arrays {
1371                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1372                                elem.insert(gql_name.to_string(), val);
1373                            }
1374                            // Also store by column name for Union resolvers
1375                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1376                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1377                                elem.insert(col.clone(), val);
1378                            }
1379                            elements.push(FieldValue::owned_any(elem));
1380                        }
1381                        Ok(Some(FieldValue::list(elements)))
1382                    })
1383                },
1384            );
1385            if let Some(desc) = description {
1386                array_field = array_field.description(desc);
1387            }
1388            c.record_fields.push(array_field);
1389
1390            // Add includes filter for the array dimension
1391            c.filter_fields.push(InputValue::new(
1392                graphql_name,
1393                TypeRef::named(&includes_filter_name),
1394            ));
1395
1396            c.extra_objects.push(element_obj);
1397            c.extra_inputs.push(includes_filter);
1398        }
1399    }
1400}
1401
1402/// Resolve the GraphQL Union __typename from an ABI type string.
1403fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1404    let fallback = variants.last().map(|v| v.type_name.clone()).unwrap_or_default();
1405    match type_str {
1406        "u8" | "u16" | "u32" | "i32" => {
1407            variants.iter().find(|v| v.field_name == "integer")
1408                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1409        }
1410        "u64" | "u128" | "i64" => {
1411            variants.iter().find(|v| v.field_name == "bigInteger")
1412                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1413        }
1414        "string" => {
1415            variants.iter().find(|v| v.field_name == "string")
1416                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1417        }
1418        "publicKey" | "pubkey" => {
1419            variants.iter().find(|v| v.field_name == "address")
1420                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1421        }
1422        "bool" => {
1423            variants.iter().find(|v| v.field_name == "bool")
1424                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1425        }
1426        _ => fallback,
1427    }
1428}
1429
1430fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1431    match dt {
1432        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1433        DimType::Int => TypeRef::named(TypeRef::INT),
1434        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1435        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1436    }
1437}
1438
1439fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1440    match dt {
1441        DimType::String => "StringFilter",
1442        DimType::Int => "IntFilter",
1443        DimType::Float => "FloatFilter",
1444        DimType::DateTime => "DateTimeFilter",
1445        DimType::Bool => "BoolFilter",
1446    }
1447}
1448
1449pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1450    match v {
1451        serde_json::Value::Null => Value::Null,
1452        serde_json::Value::Bool(b) => Value::from(b),
1453        serde_json::Value::Number(n) => {
1454            if let Some(i) = n.as_i64() { Value::from(i) }
1455            else if let Some(f) = n.as_f64() { Value::from(f) }
1456            else { Value::from(n.to_string()) }
1457        }
1458        serde_json::Value::String(s) => Value::from(s),
1459        _ => Value::from(v.to_string()),
1460    }
1461}
1462
1463/// Build a JoinExpr from a JoinDef and target cube definition.
1464/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1465fn build_join_expr(
1466    jd: &crate::cube::definition::JoinDef,
1467    target_cube: &CubeDefinition,
1468    sub_field: &async_graphql::SelectionField<'_>,
1469    network: &str,
1470    join_idx: usize,
1471) -> JoinExpr {
1472    let target_flat = target_cube.flat_dimensions();
1473    let target_table = target_cube.table_for_chain(network);
1474
1475    let mut requested_paths = HashSet::new();
1476    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1477
1478    let mut selects: Vec<SelectExpr> = target_flat.iter()
1479        .filter(|(path, _)| requested_paths.contains(path))
1480        .map(|(_, dim)| SelectExpr::Column {
1481            column: dim.column.clone(),
1482            alias: None,
1483        })
1484        .collect();
1485
1486    if selects.is_empty() {
1487        selects = target_flat.iter()
1488            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1489            .collect();
1490    }
1491
1492    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1493
1494    let group_by = if is_aggregate {
1495        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1496        for sel in &selects {
1497            if let SelectExpr::Column { column, .. } = sel {
1498                if !column.contains('(') && !gb.contains(column) {
1499                    gb.push(column.clone());
1500                }
1501            }
1502        }
1503        gb
1504    } else {
1505        vec![]
1506    };
1507
1508    JoinExpr {
1509        schema: target_cube.schema.clone(),
1510        table: target_table,
1511        alias: format!("_j{}", join_idx),
1512        conditions: jd.conditions.clone(),
1513        selects,
1514        group_by,
1515        use_final: target_cube.use_final,
1516        is_aggregate,
1517        target_cube: jd.target_cube.clone(),
1518        join_field: sub_field.name().to_string(),
1519        join_type: jd.join_type.clone(),
1520    }
1521}
1522
1523/// Convert a ClickHouse DateTime value to ISO 8601 format.
1524/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1525fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1526    match v {
1527        serde_json::Value::String(s) => {
1528            let iso = if s.contains('T') {
1529                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1530            } else {
1531                let replaced = s.replacen(' ', "T", 1);
1532                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1533            };
1534            Value::from(iso)
1535        }
1536        other => json_to_gql_value(other),
1537    }
1538}