Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59}
60
61/// Callback to transform the resolved table name based on `ChainContext`.
62/// Called after `resolve_table` picks the base table, before SQL compilation.
63/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
64pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66/// Configuration for supported networks (chains) and optional stats collection.
67pub struct SchemaConfig {
68    pub chain_groups: Vec<ChainGroupConfig>,
69    pub root_query_name: String,
70    /// Optional callback invoked after each cube query with execution metadata.
71    /// Used by application layer for billing, observability, etc.
72    pub stats_callback: Option<StatsCallback>,
73    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
74    pub table_name_transform: Option<TableNameTransform>,
75    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
76    pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80    fn default() -> Self {
81        Self {
82            chain_groups: vec![
83                ChainGroupConfig {
84                    name: "EVM".to_string(),
85                    group: ChainGroup::Evm,
86                    networks: vec!["eth".into(), "bsc".into()],
87                    has_network_arg: true,
88                    extra_args: vec![],
89                },
90                ChainGroupConfig {
91                    name: "Solana".to_string(),
92                    group: ChainGroup::Solana,
93                    networks: vec!["sol".into()],
94                    has_network_arg: false,
95                    extra_args: vec![],
96                },
97                ChainGroupConfig {
98                    name: "Trading".to_string(),
99                    group: ChainGroup::Trading,
100                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101                    has_network_arg: false,
102                    extra_args: vec![],
103                },
104            ],
105            root_query_name: "ChainStream".to_string(),
106            stats_callback: None,
107            table_name_transform: None,
108            extra_types: vec![],
109        }
110    }
111}
112
113/// Stored in the parent value of chain wrapper types so cube resolvers can
114/// retrieve the active chain without a `network` argument on themselves.
115/// Service-layer extra_args values are available in the `extra` map.
116pub struct ChainContext {
117    pub network: String,
118    pub extra: HashMap<String, String>,
119}
120
121/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
122///
123/// Schema shape (Bitquery-aligned):
124/// ```graphql
125/// type Query {
126///   EVM(network: EVMNetwork!): EVM!
127///   Solana: Solana!
128///   Trading: Trading!
129///   _cubeMetadata: String!
130/// }
131/// ```
132pub fn build_schema(
133    registry: CubeRegistry,
134    dialect: Arc<dyn SqlDialect>,
135    executor: QueryExecutor,
136    config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138    let mut builder = Schema::build("Query", None, None);
139
140    // --- shared input / enum types -------------------------------------------
141    builder = builder.register(filter_types::build_limit_input());
142    // LimitByInput is now generated per-cube in build_cube_types() with typed enum for `by`
143    builder = builder.register(
144        Enum::new("OrderDirection")
145            .description("Sort direction")
146            .item(EnumItem::new("ASC").description("Ascending"))
147            .item(EnumItem::new("DESC").description("Descending")),
148    );
149    for input in filter_types::build_filter_primitives() {
150        builder = builder.register(input);
151    }
152    builder = builder.register(
153        Enum::new("TimeUnit")
154            .description("Time unit for interval bucketing")
155            .item(EnumItem::new("seconds"))
156            .item(EnumItem::new("minutes"))
157            .item(EnumItem::new("hours"))
158            .item(EnumItem::new("days"))
159            .item(EnumItem::new("weeks"))
160            .item(EnumItem::new("months")),
161    );
162    builder = builder.register(
163        InputObject::new("TimeIntervalInput")
164            .description("Time bucketing interval for DateTime dimensions")
165            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
166            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
167    );
168    builder = builder.register(
169        InputObject::new("DimSelectWhere")
170            .description("Post-aggregation filter for dimension values (HAVING clause)")
171            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
172            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
173            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
174            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
175            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
176            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
177    );
178
179    for extra_enum in config.extra_types {
180        builder = builder.register(extra_enum);
181    }
182
183    // --- per-group network enums ---------------------------------------------
184    for grp in &config.chain_groups {
185        if grp.has_network_arg {
186            let enum_name = format!("{}Network", grp.name);
187            let mut net_enum = Enum::new(&enum_name)
188                .description(format!("{} network selector", grp.name));
189            for net in &grp.networks {
190                net_enum = net_enum.item(EnumItem::new(net));
191            }
192            builder = builder.register(net_enum);
193        }
194    }
195
196    // --- register cube types once (shared across chain groups) ----------------
197    let mut registered_cubes: HashSet<String> = HashSet::new();
198    for cube in registry.cubes() {
199        if registered_cubes.contains(&cube.name) { continue; }
200        registered_cubes.insert(cube.name.clone());
201
202        let types = build_cube_types(cube);
203        for obj in types.objects { builder = builder.register(obj); }
204        for inp in types.inputs { builder = builder.register(inp); }
205        for en in types.enums { builder = builder.register(en); }
206        for un in types.unions { builder = builder.register(un); }
207    }
208
209    // --- build chain wrapper types + Query fields ----------------------------
210    let mut query = Object::new("Query");
211
212    for grp in &config.chain_groups {
213        let wrapper_type_name = grp.name.clone();
214
215        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
216        // cube fields that belong to this group.
217        let mut wrapper_obj = Object::new(&wrapper_type_name);
218
219        for cube in registry.cubes() {
220            if !cube.chain_groups.contains(&grp.group) { continue; }
221
222            let cube_field = build_cube_field(
223                cube,
224                dialect.clone(),
225                executor.clone(),
226                config.stats_callback.clone(),
227            );
228            wrapper_obj = wrapper_obj.field(cube_field);
229        }
230        builder = builder.register(wrapper_obj);
231
232        // Wrapper resolver on Query — stores ChainContext for child resolvers.
233        let default_network = grp.networks.first().cloned().unwrap_or_default();
234        let has_net_arg = grp.has_network_arg;
235        let net_enum_name = format!("{}Network", grp.name);
236        let wrapper_type_for_resolver = wrapper_type_name.clone();
237        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
238
239        let mut wrapper_field = Field::new(
240            &wrapper_type_name,
241            TypeRef::named_nn(&wrapper_type_name),
242            move |ctx| {
243                let default = default_network.clone();
244                let has_arg = has_net_arg;
245                let arg_names = extra_arg_names.clone();
246                FieldFuture::new(async move {
247                    let network = if has_arg {
248                        ctx.args.try_get("network")
249                            .ok()
250                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
251                            .unwrap_or(default)
252                    } else {
253                        default
254                    };
255                    let mut extra = HashMap::new();
256                    for arg_name in &arg_names {
257                        if let Ok(val) = ctx.args.try_get(arg_name) {
258                            let resolved = val.enum_name().ok().map(|s| s.to_string())
259                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
260                                .or_else(|| val.string().ok().map(|s| s.to_string()));
261                            if let Some(v) = resolved {
262                                extra.insert(arg_name.clone(), v);
263                            }
264                        }
265                    }
266                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
267                })
268            },
269        );
270
271        if grp.has_network_arg {
272            wrapper_field = wrapper_field.argument(
273                InputValue::new("network", TypeRef::named_nn(&net_enum_name))
274                    .description(format!("{} network to query", wrapper_type_for_resolver)),
275            );
276        }
277        for wa in &grp.extra_args {
278            wrapper_field = wrapper_field.argument(
279                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
280                    .description(&wa.description),
281            );
282        }
283
284        query = query.field(wrapper_field);
285    }
286
287    // --- _cubeMetadata -------------------------------------------------------
288    let metadata_registry = Arc::new(registry.clone());
289    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
290        .map(|g| (g.name.clone(), g.group.clone()))
291        .collect();
292    let metadata_field = Field::new(
293        "_cubeMetadata",
294        TypeRef::named_nn(TypeRef::STRING),
295        move |_ctx| {
296            let reg = metadata_registry.clone();
297            let groups = metadata_groups.clone();
298            FieldFuture::new(async move {
299                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
300                for (group_name, group_enum) in &groups {
301                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
302                        .filter(|c| c.chain_groups.contains(group_enum))
303                        .map(serialize_cube_metadata)
304                        .collect();
305                    group_metadata.push(serde_json::json!({
306                        "group": group_name,
307                        "cubes": cubes_in_group,
308                    }));
309                }
310                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
311                Ok(Some(FieldValue::value(Value::from(json))))
312            })
313        },
314    )
315    .description("Internal: returns JSON metadata about all cubes grouped by chain");
316    query = query.field(metadata_field);
317
318    builder = builder.register(query);
319    builder = builder.data(registry);
320
321    if let Some(transform) = config.table_name_transform.clone() {
322        builder = builder.data(transform);
323    }
324
325    builder.finish()
326}
327
328fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
329    serde_json::json!({
330        "name": cube.name,
331        "description": cube.description,
332        "schema": cube.schema,
333        "tablePattern": cube.table_pattern,
334        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
335        "metrics": cube.metrics.iter().map(|m| {
336            let mut obj = serde_json::json!({
337                "name": m.name,
338                "returnType": format!("{:?}", m.return_type),
339            });
340            if let Some(ref tmpl) = m.expression_template {
341                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
342            }
343            if let Some(ref desc) = m.description {
344                obj["description"] = serde_json::Value::String(desc.clone());
345            }
346            obj
347        }).collect::<Vec<_>>(),
348        "selectors": cube.selectors.iter().map(|s| {
349            serde_json::json!({
350                "name": s.graphql_name,
351                "column": s.column,
352                "type": format!("{:?}", s.dim_type),
353            })
354        }).collect::<Vec<_>>(),
355        "dimensions": serialize_dims(&cube.dimensions),
356        "joins": cube.joins.iter().map(|j| {
357            serde_json::json!({
358                "field": j.field_name,
359                "target": j.target_cube,
360                "joinType": format!("{:?}", j.join_type),
361            })
362        }).collect::<Vec<_>>(),
363        "tableRoutes": cube.table_routes.iter().map(|r| {
364            serde_json::json!({
365                "schema": r.schema,
366                "tablePattern": r.table_pattern,
367                "availableColumns": r.available_columns,
368                "priority": r.priority,
369            })
370        }).collect::<Vec<_>>(),
371        "defaultLimit": cube.default_limit,
372        "maxLimit": cube.max_limit,
373    })
374}
375
376/// Build a single cube Field that reads `network` from the parent ChainContext.
377fn build_cube_field(
378    cube: &CubeDefinition,
379    dialect: Arc<dyn SqlDialect>,
380    executor: QueryExecutor,
381    stats_cb: Option<StatsCallback>,
382) -> Field {
383    let cube_name = cube.name.clone();
384    let orderby_input_name = format!("{}OrderByInput", cube.name);
385    let cube_description = cube.description.clone();
386
387    let mut field = Field::new(
388        &cube.name,
389        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
390        move |ctx| {
391            let cube_name = cube_name.clone();
392            let dialect = dialect.clone();
393            let executor = executor.clone();
394            let stats_cb = stats_cb.clone();
395            FieldFuture::new(async move {
396                let registry = ctx.ctx.data::<CubeRegistry>()?;
397
398                let chain_ctx = ctx.parent_value
399                    .try_downcast_ref::<ChainContext>().ok();
400                let network = chain_ctx
401                    .map(|c| c.network.as_str())
402                    .unwrap_or("sol");
403
404                let cube_def = registry.get(&cube_name).ok_or_else(|| {
405                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
406                })?;
407
408                let metric_requests = extract_metric_requests(&ctx, cube_def);
409                let quantile_requests = extract_quantile_requests(&ctx);
410                let calculate_requests = extract_calculate_requests(&ctx);
411                let field_aliases = extract_field_aliases(&ctx, cube_def);
412                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
413                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
414                let requested = extract_requested_fields(&ctx, cube_def);
415                let mut ir = compiler::parser::parse_cube_query(
416                    cube_def,
417                    network,
418                    &ctx.args,
419                    &metric_requests,
420                    &quantile_requests,
421                    &calculate_requests,
422                    &field_aliases,
423                    &dim_agg_requests,
424                    &time_intervals,
425                    Some(requested),
426                )?;
427
428                let mut join_idx = 0usize;
429                for sub_field in ctx.ctx.field().selection_set() {
430                    let fname = sub_field.name().to_string();
431                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
432                    if let Some(jd) = join_def {
433                        if let Some(target_cube) = registry.get(&jd.target_cube) {
434                            let join_expr = build_join_expr(
435                                jd, target_cube, &sub_field, network, join_idx,
436                            );
437                            ir.joins.push(join_expr);
438                            join_idx += 1;
439                        }
440                    }
441                }
442
443                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
444                    if let Some(cctx) = chain_ctx {
445                        ir.table = transform(&ir.table, cctx);
446                    }
447                }
448
449                let validated = compiler::validator::validate(ir)?;
450                let result = dialect.compile(&validated);
451                let sql = result.sql;
452                let bindings = result.bindings;
453
454                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
455                    async_graphql::Error::new(format!("Query execution failed: {e}"))
456                })?;
457
458                let rows = if result.alias_remap.is_empty() {
459                    rows
460                } else {
461                    rows.into_iter().map(|mut row| {
462                        for (alias, original) in &result.alias_remap {
463                            if let Some(val) = row.shift_remove(alias) {
464                                row.entry(original.clone()).or_insert(val);
465                            }
466                        }
467                        row
468                    }).collect()
469                };
470
471                let rows: Vec<RowMap> = if validated.joins.is_empty() {
472                    rows
473                } else {
474                    rows.into_iter().map(|mut row| {
475                        for join in &validated.joins {
476                            let prefix = format!("{}.", join.alias);
477                            let mut sub_row = RowMap::new();
478                            let keys: Vec<String> = row.keys()
479                                .filter(|k| k.starts_with(&prefix))
480                                .cloned()
481                                .collect();
482                            for key in keys {
483                                if let Some(val) = row.shift_remove(&key) {
484                                    sub_row.insert(key[prefix.len()..].to_string(), val);
485                                }
486                            }
487                            let obj: serde_json::Map<String, serde_json::Value> =
488                                sub_row.into_iter().collect();
489                            row.insert(
490                                join.join_field.clone(),
491                                serde_json::Value::Object(obj),
492                            );
493                        }
494                        row
495                    }).collect()
496                };
497
498                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
499                    .or_else(|| stats_cb.clone());
500                if let Some(cb) = effective_cb {
501                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
502                    cb(stats);
503                }
504
505                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
506                Ok(Some(FieldValue::list(values)))
507            })
508        },
509    );
510    if !cube_description.is_empty() {
511        field = field.description(&cube_description);
512    }
513    field = field
514        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
515            .description("Filter conditions"))
516        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
517            .description("Pagination control"))
518        .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
519            .description("Per-group row limit"))
520        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
521            .description("Sort order (Bitquery-compatible)"));
522
523    for sel in &cube.selectors {
524        let filter_type = dim_type_to_filter_name(&sel.dim_type);
525        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
526            .description(format!("Shorthand filter for {}", sel.graphql_name)));
527    }
528
529    field
530}
531
532fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
533    serde_json::Value::Array(dims.iter().map(|d| match d {
534        DimensionNode::Leaf(dim) => {
535            let mut obj = serde_json::json!({
536                "name": dim.graphql_name,
537                "column": dim.column,
538                "type": format!("{:?}", dim.dim_type),
539            });
540            if let Some(desc) = &dim.description {
541                obj["description"] = serde_json::Value::String(desc.clone());
542            }
543            obj
544        },
545        DimensionNode::Group { graphql_name, description, children } => {
546            let mut obj = serde_json::json!({
547                "name": graphql_name,
548                "children": serialize_dims(children),
549            });
550            if let Some(desc) = description {
551                obj["description"] = serde_json::Value::String(desc.clone());
552            }
553            obj
554        },
555        DimensionNode::Array { graphql_name, description, children } => {
556            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
557                let type_value = match &f.field_type {
558                    crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
559                        "kind": "scalar",
560                        "scalarType": format!("{:?}", dt),
561                    }),
562                    crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
563                        "kind": "union",
564                        "variants": variants.iter().map(|v| serde_json::json!({
565                            "typeName": v.type_name,
566                            "fieldName": v.field_name,
567                            "sourceType": format!("{:?}", v.source_type),
568                        })).collect::<Vec<_>>(),
569                    }),
570                };
571                let mut field_obj = serde_json::json!({
572                    "name": f.graphql_name,
573                    "column": f.column,
574                    "type": type_value,
575                });
576                if let Some(desc) = &f.description {
577                    field_obj["description"] = serde_json::Value::String(desc.clone());
578                }
579                field_obj
580            }).collect();
581            let mut obj = serde_json::json!({
582                "name": graphql_name,
583                "kind": "array",
584                "fields": fields,
585            });
586            if let Some(desc) = description {
587                obj["description"] = serde_json::Value::String(desc.clone());
588            }
589            obj
590        },
591    }).collect())
592}
593
594/// Extract metric requests from the GraphQL selection set by inspecting
595/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
596/// the "count" field in the selection set and extract its `of` argument.
597fn extract_metric_requests(
598    ctx: &async_graphql::dynamic::ResolverContext,
599    cube: &CubeDefinition,
600) -> Vec<compiler::parser::MetricRequest> {
601    let mut requests = Vec::new();
602
603    for sub_field in ctx.ctx.field().selection_set() {
604        let name = sub_field.name();
605        if !cube.has_metric(name) {
606            continue;
607        }
608
609        let args = match sub_field.arguments() {
610            Ok(args) => args,
611            Err(_) => continue,
612        };
613
614        let of_dimension = args
615            .iter()
616            .find(|(k, _)| k.as_str() == "of")
617            .and_then(|(_, v)| match v {
618                async_graphql::Value::Enum(e) => Some(e.to_string()),
619                async_graphql::Value::String(s) => Some(s.clone()),
620                _ => None,
621            })
622            .unwrap_or_else(|| "*".to_string());
623
624        let select_where_value = args
625            .iter()
626            .find(|(k, _)| k.as_str() == "selectWhere")
627            .map(|(_, v)| v.clone());
628
629        let condition_filter = args
630            .iter()
631            .find(|(k, _)| k.as_str() == "if")
632            .and_then(|(_, v)| {
633                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
634                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
635            });
636
637        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
638        requests.push(compiler::parser::MetricRequest {
639            function: name.to_string(),
640            alias: gql_alias,
641            of_dimension,
642            select_where_value,
643            condition_filter,
644        });
645    }
646
647    requests
648}
649
650fn extract_requested_fields(
651    ctx: &async_graphql::dynamic::ResolverContext,
652    cube: &CubeDefinition,
653) -> HashSet<String> {
654    let mut fields = HashSet::new();
655    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
656    fields
657}
658
659fn collect_selection_paths(
660    field: &async_graphql::SelectionField<'_>,
661    prefix: &str,
662    out: &mut HashSet<String>,
663    metrics: &[MetricDef],
664) {
665    for sub in field.selection_set() {
666        let name = sub.name();
667        if metrics.iter().any(|m| m.name == name) {
668            continue;
669        }
670        let path = if prefix.is_empty() {
671            name.to_string()
672        } else {
673            format!("{prefix}_{name}")
674        };
675        let has_children = sub.selection_set().next().is_some();
676        if has_children {
677            collect_selection_paths(&sub, &path, out, metrics);
678        } else {
679            out.insert(path);
680        }
681    }
682}
683
684/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
685/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
686pub type FieldAliasMap = Vec<(String, String)>;
687
688/// Describes a `quantile(of: ..., level: ...)` request.
689pub struct QuantileRequest {
690    pub alias: String,
691    pub of_dimension: String,
692    pub level: f64,
693}
694
695/// Describes a `calculate(expression: "...")` request.
696pub struct CalculateRequest {
697    pub alias: String,
698    pub expression: String,
699}
700
701/// Describes a time interval bucketing request.
702/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
703pub struct TimeIntervalRequest {
704    pub field_path: String,
705    pub graphql_alias: String,
706    pub column: String,
707    pub unit: String,
708    pub count: i64,
709}
710
711/// Describes a dimension-level aggregation request extracted from the selection set.
712/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
713pub struct DimAggRequest {
714    pub field_path: String,
715    pub graphql_alias: String,
716    pub value_column: String,
717    pub agg_type: DimAggType,
718    pub compare_column: String,
719    pub condition_filter: Option<FilterNode>,
720    pub select_where_value: Option<async_graphql::Value>,
721}
722
723fn extract_quantile_requests(
724    ctx: &async_graphql::dynamic::ResolverContext,
725) -> Vec<QuantileRequest> {
726    let mut requests = Vec::new();
727    for sub_field in ctx.ctx.field().selection_set() {
728        if sub_field.name() != "quantile" { continue; }
729        let args = match sub_field.arguments() {
730            Ok(a) => a,
731            Err(_) => continue,
732        };
733        let of_dim = args.iter()
734            .find(|(k, _)| k.as_str() == "of")
735            .and_then(|(_, v)| match v {
736                async_graphql::Value::Enum(e) => Some(e.to_string()),
737                async_graphql::Value::String(s) => Some(s.clone()),
738                _ => None,
739            })
740            .unwrap_or_else(|| "*".to_string());
741        let level = args.iter()
742            .find(|(k, _)| k.as_str() == "level")
743            .and_then(|(_, v)| match v {
744                async_graphql::Value::Number(n) => n.as_f64(),
745                _ => None,
746            })
747            .unwrap_or(0.5);
748        let alias = sub_field.alias().unwrap_or("quantile").to_string();
749        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
750    }
751    requests
752}
753
754fn extract_field_aliases(
755    ctx: &async_graphql::dynamic::ResolverContext,
756    cube: &CubeDefinition,
757) -> FieldAliasMap {
758    let flat = cube.flat_dimensions();
759    let mut aliases = Vec::new();
760    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
761    aliases
762}
763
764fn collect_field_aliases(
765    field: &async_graphql::SelectionField<'_>,
766    prefix: &str,
767    flat: &[(String, crate::cube::definition::Dimension)],
768    metrics: &[MetricDef],
769    out: &mut FieldAliasMap,
770) {
771    for sub in field.selection_set() {
772        let name = sub.name();
773        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
774            continue;
775        }
776        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
777        let has_children = sub.selection_set().next().is_some();
778        if has_children {
779            collect_field_aliases(&sub, &path, flat, metrics, out);
780        } else if let Some(alias) = sub.alias() {
781            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
782                let alias_path = if prefix.is_empty() {
783                    alias.to_string()
784                } else {
785                    format!("{prefix}_{alias}")
786                };
787                out.push((alias_path, dim.column.clone()));
788            }
789        }
790    }
791}
792
793fn extract_calculate_requests(
794    ctx: &async_graphql::dynamic::ResolverContext,
795) -> Vec<CalculateRequest> {
796    let mut requests = Vec::new();
797    for sub_field in ctx.ctx.field().selection_set() {
798        if sub_field.name() != "calculate" { continue; }
799        let args = match sub_field.arguments() {
800            Ok(a) => a,
801            Err(_) => continue,
802        };
803        let expression = args.iter()
804            .find(|(k, _)| k.as_str() == "expression")
805            .and_then(|(_, v)| match v {
806                async_graphql::Value::String(s) => Some(s.clone()),
807                _ => None,
808            });
809        if let Some(expr) = expression {
810            let alias = sub_field.alias().unwrap_or("calculate").to_string();
811            requests.push(CalculateRequest { alias, expression: expr });
812        }
813    }
814    requests
815}
816
817fn extract_dim_agg_requests(
818    ctx: &async_graphql::dynamic::ResolverContext,
819    cube: &CubeDefinition,
820) -> Vec<DimAggRequest> {
821    let flat = cube.flat_dimensions();
822    let mut requests = Vec::new();
823    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
824    requests
825}
826
827fn collect_dim_agg_paths(
828    field: &async_graphql::SelectionField<'_>,
829    prefix: &str,
830    flat: &[(String, crate::cube::definition::Dimension)],
831    metrics: &[MetricDef],
832    cube: &CubeDefinition,
833    out: &mut Vec<DimAggRequest>,
834) {
835    for sub in field.selection_set() {
836        let name = sub.name();
837        if metrics.iter().any(|m| m.name == name) {
838            continue;
839        }
840        let path = if prefix.is_empty() {
841            name.to_string()
842        } else {
843            format!("{prefix}_{name}")
844        };
845        let has_children = sub.selection_set().next().is_some();
846        if has_children {
847            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
848        } else {
849            let args = match sub.arguments() {
850                Ok(a) => a,
851                Err(_) => continue,
852            };
853            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
854            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
855
856            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
857                let cp = match v {
858                    async_graphql::Value::Enum(e) => e.to_string(),
859                    async_graphql::Value::String(s) => s.clone(),
860                    _ => continue,
861                };
862                (DimAggType::ArgMax, cp)
863            } else if let Some((_, v)) = min_val {
864                let cp = match v {
865                    async_graphql::Value::Enum(e) => e.to_string(),
866                    async_graphql::Value::String(s) => s.clone(),
867                    _ => continue,
868                };
869                (DimAggType::ArgMin, cp)
870            } else {
871                continue;
872            };
873
874            let value_column = flat.iter()
875                .find(|(p, _)| p == &path)
876                .map(|(_, dim)| dim.column.clone());
877            let compare_column = flat.iter()
878                .find(|(p, _)| p == &compare_path)
879                .map(|(_, dim)| dim.column.clone());
880
881            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
882                let condition_filter = args.iter()
883                    .find(|(k, _)| k.as_str() == "if")
884                    .and_then(|(_, v)| {
885                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
886                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
887                    });
888                let select_where_value = args.iter()
889                    .find(|(k, _)| k.as_str() == "selectWhere")
890                    .map(|(_, v)| v.clone());
891
892                let gql_alias = sub.alias()
893                    .map(|a| a.to_string())
894                    .unwrap_or_else(|| path.clone());
895                out.push(DimAggRequest {
896                    field_path: path,
897                    graphql_alias: gql_alias,
898                    value_column: vc,
899                    agg_type,
900                    compare_column: cc,
901                    condition_filter,
902                    select_where_value,
903                });
904            }
905        }
906    }
907}
908
909fn extract_time_interval_requests(
910    ctx: &async_graphql::dynamic::ResolverContext,
911    cube: &CubeDefinition,
912) -> Vec<TimeIntervalRequest> {
913    let flat = cube.flat_dimensions();
914    let mut requests = Vec::new();
915    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
916    requests
917}
918
919fn collect_time_interval_paths(
920    field: &async_graphql::SelectionField<'_>,
921    prefix: &str,
922    flat: &[(String, crate::cube::definition::Dimension)],
923    metrics: &[MetricDef],
924    out: &mut Vec<TimeIntervalRequest>,
925) {
926    for sub in field.selection_set() {
927        let name = sub.name();
928        if metrics.iter().any(|m| m.name == name) { continue; }
929        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
930        let has_children = sub.selection_set().next().is_some();
931        if has_children {
932            collect_time_interval_paths(&sub, &path, flat, metrics, out);
933        } else {
934            let args = match sub.arguments() {
935                Ok(a) => a,
936                Err(_) => continue,
937            };
938            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
939            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
940                let unit = obj.get("in")
941                    .and_then(|v| match v {
942                        async_graphql::Value::Enum(e) => Some(e.to_string()),
943                        async_graphql::Value::String(s) => Some(s.clone()),
944                        _ => None,
945                    });
946                let count = obj.get("count")
947                    .and_then(|v| match v {
948                        async_graphql::Value::Number(n) => n.as_i64(),
949                        _ => None,
950                    });
951                if let (Some(unit), Some(count)) = (unit, count) {
952                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
953                        let gql_alias = sub.alias().unwrap_or(name).to_string();
954                        out.push(TimeIntervalRequest {
955                            field_path: path,
956                            graphql_alias: gql_alias,
957                            column: dim.column.clone(),
958                            unit,
959                            count,
960                        });
961                    }
962                }
963            }
964        }
965    }
966}
967
968// ---------------------------------------------------------------------------
969// Per-Cube GraphQL type generation
970// ---------------------------------------------------------------------------
971
972struct CubeTypes {
973    objects: Vec<Object>,
974    inputs: Vec<InputObject>,
975    enums: Vec<Enum>,
976    unions: Vec<Union>,
977}
978
979fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
980    let record_name = format!("{}Record", cube.name);
981    let filter_name = format!("{}Filter", cube.name);
982    let compare_enum_name = format!("{}CompareFields", cube.name);
983
984    let flat_dims = cube.flat_dimensions();
985
986    let mut compare_enum = Enum::new(&compare_enum_name)
987        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
988    for (path, _) in &flat_dims {
989        compare_enum = compare_enum.item(EnumItem::new(path));
990    }
991
992    let mut record_fields: Vec<Field> = Vec::new();
993    let mut filter_fields: Vec<InputValue> = Vec::new();
994    let mut extra_objects: Vec<Object> = Vec::new();
995    let mut extra_inputs: Vec<InputObject> = Vec::new();
996    let mut extra_unions: Vec<Union> = Vec::new();
997
998    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
999        .description("OR combinator — matches if any sub-filter matches"));
1000
1001    {
1002        let mut collector = DimCollector {
1003            cube_name: &cube.name,
1004            compare_enum_name: &compare_enum_name,
1005            filter_name: &filter_name,
1006            record_fields: &mut record_fields,
1007            filter_fields: &mut filter_fields,
1008            extra_objects: &mut extra_objects,
1009            extra_inputs: &mut extra_inputs,
1010            extra_unions: &mut extra_unions,
1011        };
1012        for node in &cube.dimensions {
1013            collect_dimension_types(node, "", &mut collector);
1014        }
1015    }
1016
1017    let mut metric_enums: Vec<Enum> = Vec::new();
1018    let builtin_descs: std::collections::HashMap<&str, &str> = [
1019        ("count", "Count of rows or distinct values"),
1020        ("sum", "Sum of values"),
1021        ("avg", "Average of values"),
1022        ("min", "Minimum value"),
1023        ("max", "Maximum value"),
1024        ("uniq", "Count of unique (distinct) values"),
1025    ].into_iter().collect();
1026
1027    for metric in &cube.metrics {
1028        let metric_name = &metric.name;
1029        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1030
1031        if metric.supports_where {
1032            extra_inputs.push(
1033                InputObject::new(&select_where_name)
1034                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1035                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1036                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1037                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1038                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1039                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1040            );
1041        }
1042
1043        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1044        let mut of_enum = Enum::new(&of_enum_name)
1045            .description(format!("Dimension to apply {} aggregation on", metric_name));
1046        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1047        metric_enums.push(of_enum);
1048
1049        let metric_name_clone = metric_name.clone();
1050        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1051        let metric_desc = metric.description.as_deref()
1052            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1053            .unwrap_or("Aggregate metric");
1054
1055        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1056            let default_name = metric_name_clone.clone();
1057            FieldFuture::new(async move {
1058                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1059                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1060                let key = metric_key(alias);
1061                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1062                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1063            })
1064        })
1065        .description(metric_desc)
1066        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1067            .description("Dimension to aggregate on (default: all rows)"));
1068
1069        if metric.supports_where {
1070            metric_field = metric_field
1071                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1072                    .description("Post-aggregation filter (HAVING)"))
1073                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1074                    .description("Conditional filter for this metric"));
1075        }
1076
1077        record_fields.push(metric_field);
1078    }
1079
1080    // `quantile(of: ..., level: ...)` — percentile computation
1081    {
1082        let of_enum_name = format!("{}_quantile_Of", cube.name);
1083        let mut of_enum = Enum::new(&of_enum_name)
1084            .description(format!("Dimension to apply quantile on for {}", cube.name));
1085        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1086        metric_enums.push(of_enum);
1087
1088        let of_enum_for_closure = of_enum_name.clone();
1089        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1090            FieldFuture::new(async move {
1091                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1092                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1093                let key = metric_key(alias);
1094                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1095                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1096            })
1097        })
1098        .description("Compute a quantile (percentile) of a dimension")
1099        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1100            .description("Dimension to compute quantile on"))
1101        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1102            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1103        record_fields.push(quantile_field);
1104    }
1105
1106    // `calculate(expression: "...")` — runtime expression computation
1107    {
1108        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1109            FieldFuture::new(async move {
1110                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1111                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1112                let key = metric_key(alias);
1113                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1114                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1115            })
1116        })
1117        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1118        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1119            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1120        record_fields.push(calculate_field);
1121    }
1122
1123    // Add join fields: joinXxx returns the target cube's Record type
1124    for jd in &cube.joins {
1125        let target_record_name = format!("{}Record", jd.target_cube);
1126        let field_name_owned = jd.field_name.clone();
1127        let mut join_field = Field::new(
1128            &jd.field_name,
1129            TypeRef::named(&target_record_name),
1130            move |ctx| {
1131                let field_name = field_name_owned.clone();
1132                FieldFuture::new(async move {
1133                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1134                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1135                        let sub_row: RowMap = obj.iter()
1136                            .map(|(k, v)| (k.clone(), v.clone()))
1137                            .collect();
1138                        Ok(Some(FieldValue::owned_any(sub_row)))
1139                    } else {
1140                        Ok(Some(FieldValue::value(Value::Null)))
1141                    }
1142                })
1143            },
1144        );
1145        if let Some(desc) = &jd.description {
1146            join_field = join_field.description(desc);
1147        }
1148        record_fields.push(join_field);
1149    }
1150
1151    let mut record = Object::new(&record_name);
1152    for f in record_fields { record = record.field(f); }
1153
1154    let mut filter = InputObject::new(&filter_name)
1155        .description(format!("Filter conditions for {} query", cube.name));
1156    for f in filter_fields { filter = filter.field(f); }
1157
1158    let orderby_input_name = format!("{}OrderByInput", cube.name);
1159    let orderby_input = InputObject::new(&orderby_input_name)
1160        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1161        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1162            .description("Sort descending by this field"))
1163        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1164            .description("Sort ascending by this field"))
1165        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1166            .description("Sort descending by computed/aggregated field name"))
1167        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1168            .description("Sort ascending by computed/aggregated field name"));
1169
1170    let limitby_input_name = format!("{}LimitByInput", cube.name);
1171    let limitby_input = InputObject::new(&limitby_input_name)
1172        .description(format!("Per-group row limit for {}", cube.name))
1173        .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1174            .description("Dimension field to group by"))
1175        .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1176            .description("Maximum rows per group"))
1177        .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1178            .description("Rows to skip per group"));
1179
1180    let mut objects = vec![record]; objects.extend(extra_objects);
1181    let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1182    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1183
1184    CubeTypes { objects, inputs, enums, unions: extra_unions }
1185}
1186
1187struct DimCollector<'a> {
1188    cube_name: &'a str,
1189    compare_enum_name: &'a str,
1190    filter_name: &'a str,
1191    record_fields: &'a mut Vec<Field>,
1192    filter_fields: &'a mut Vec<InputValue>,
1193    extra_objects: &'a mut Vec<Object>,
1194    extra_inputs: &'a mut Vec<InputObject>,
1195    extra_unions: &'a mut Vec<Union>,
1196}
1197
1198fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1199    match node {
1200        DimensionNode::Leaf(dim) => {
1201            let col = dim.column.clone();
1202            let is_datetime = dim.dim_type == DimType::DateTime;
1203            let compare_enum = c.compare_enum_name.to_string();
1204            let cube_filter = c.filter_name.to_string();
1205            let mut leaf_field = Field::new(
1206                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1207                move |ctx| {
1208                    let col = col.clone();
1209                    FieldFuture::new(async move {
1210                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1211                        let has_interval = ctx.args.try_get("interval").is_ok();
1212                        let has_max = ctx.args.try_get("maximum").is_ok();
1213                        let has_min = ctx.args.try_get("minimum").is_ok();
1214                        let key = if has_interval || has_max || has_min {
1215                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1216                            dim_agg_key(name)
1217                        } else {
1218                            col.clone()
1219                        };
1220                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1221                        let gql_val = if is_datetime {
1222                            json_to_gql_datetime(val)
1223                        } else {
1224                            json_to_gql_value(val)
1225                        };
1226                        Ok(Some(FieldValue::value(gql_val)))
1227                    })
1228                },
1229            );
1230            if let Some(desc) = &dim.description {
1231                leaf_field = leaf_field.description(desc);
1232            }
1233            leaf_field = leaf_field
1234                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1235                    .description("Return value from row where compare field is maximum (argMax)"))
1236                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1237                    .description("Return value from row where compare field is minimum (argMin)"))
1238                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1239                    .description("Conditional filter for aggregation"))
1240                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1241                    .description("Post-aggregation value filter (HAVING)"));
1242            if is_datetime {
1243                leaf_field = leaf_field
1244                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1245                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1246            }
1247            // Update resolver key: if interval is present, read from the interval expression key
1248            // This is handled by the resolver checking ctx.args for "interval"
1249            c.record_fields.push(leaf_field);
1250            if matches!(dim.dim_type, DimType::Bool) {
1251                c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(TypeRef::BOOLEAN)));
1252            } else {
1253                c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1254            }
1255        }
1256        DimensionNode::Group { graphql_name, description, children } => {
1257            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1258            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1259            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1260
1261            let mut child_record_fields: Vec<Field> = Vec::new();
1262            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1263            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1264
1265            let mut child_collector = DimCollector {
1266                cube_name: c.cube_name,
1267                compare_enum_name: c.compare_enum_name,
1268                filter_name: c.filter_name,
1269                record_fields: &mut child_record_fields,
1270                filter_fields: &mut child_filter_fields,
1271                extra_objects: c.extra_objects,
1272                extra_inputs: c.extra_inputs,
1273                extra_unions: c.extra_unions,
1274            };
1275            for child in children {
1276                collect_dimension_types(child, &new_prefix, &mut child_collector);
1277            }
1278
1279            let mut nested_record = Object::new(&nested_record_name);
1280            for f in child_record_fields { nested_record = nested_record.field(f); }
1281
1282            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1283            let mut nested_filter = InputObject::new(&nested_filter_name)
1284                .description(nested_filter_desc);
1285            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1286
1287            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1288                FieldFuture::new(async move {
1289                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1290                    Ok(Some(FieldValue::owned_any(row.clone())))
1291                })
1292            });
1293            if let Some(desc) = description {
1294                group_field = group_field.description(desc);
1295            }
1296            c.record_fields.push(group_field);
1297            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1298            c.extra_objects.push(nested_record);
1299            c.extra_inputs.push(nested_filter);
1300        }
1301        DimensionNode::Array { graphql_name, description, children } => {
1302            let full_path = if prefix.is_empty() {
1303                graphql_name.clone()
1304            } else {
1305                format!("{prefix}_{graphql_name}")
1306            };
1307            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1308            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1309
1310            let mut element_obj = Object::new(&element_type_name);
1311            let mut includes_filter = InputObject::new(&includes_filter_name)
1312                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1313
1314            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1315
1316            for child in children {
1317                match &child.field_type {
1318                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1319                        let col_name = child.column.clone();
1320                        let is_datetime = *dt == DimType::DateTime;
1321                        let mut field = Field::new(
1322                            &child.graphql_name,
1323                            dim_type_to_typeref(dt),
1324                            move |ctx| {
1325                                let col = col_name.clone();
1326                                FieldFuture::new(async move {
1327                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1328                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1329                                    let gql_val = if is_datetime {
1330                                        json_to_gql_datetime(val)
1331                                    } else {
1332                                        json_to_gql_value(val)
1333                                    };
1334                                    Ok(Some(FieldValue::value(gql_val)))
1335                                })
1336                            },
1337                        );
1338                        if let Some(desc) = &child.description {
1339                            field = field.description(desc);
1340                        }
1341                        element_obj = element_obj.field(field);
1342                        includes_filter = includes_filter.field(
1343                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1344                        );
1345                    }
1346                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1347                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1348
1349                        let mut gql_union = Union::new(&union_name);
1350                        let mut variant_objects = Vec::new();
1351
1352                        for v in variants {
1353                            let variant_obj_name = v.type_name.clone();
1354                            let field_name = v.field_name.clone();
1355                            let source_type = v.source_type.clone();
1356                            let type_ref = dim_type_to_typeref(&source_type);
1357
1358                            let val_col = child.column.clone();
1359                            let variant_obj = Object::new(&variant_obj_name)
1360                                .field(Field::new(
1361                                    &field_name,
1362                                    type_ref,
1363                                    move |ctx| {
1364                                        let col = val_col.clone();
1365                                        FieldFuture::new(async move {
1366                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1367                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1368                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1369                                        })
1370                                    },
1371                                ));
1372                            gql_union = gql_union.possible_type(&variant_obj_name);
1373                            variant_objects.push(variant_obj);
1374                        }
1375
1376                        let col_name = child.column.clone();
1377                        let type_col = children.iter()
1378                            .find(|f| f.graphql_name == "Type")
1379                            .map(|f| f.column.clone())
1380                            .unwrap_or_default();
1381                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1382
1383                        let mut field = Field::new(
1384                            &child.graphql_name,
1385                            TypeRef::named(&union_name),
1386                            move |ctx| {
1387                                let col = col_name.clone();
1388                                let tcol = type_col.clone();
1389                                let vars = variants_clone.clone();
1390                                FieldFuture::new(async move {
1391                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1392                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1393                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1394                                        return Ok(None);
1395                                    }
1396                                    let type_str = row.get(&tcol)
1397                                        .and_then(|v| v.as_str())
1398                                        .unwrap_or("");
1399                                    let resolved_type = resolve_union_typename(type_str, &vars);
1400                                    let mut elem = RowMap::new();
1401                                    elem.insert(col.clone(), raw_val);
1402                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1403                                })
1404                            },
1405                        );
1406                        if let Some(desc) = &child.description {
1407                            field = field.description(desc);
1408                        }
1409                        element_obj = element_obj.field(field);
1410
1411                        includes_filter = includes_filter.field(
1412                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1413                        );
1414
1415                        union_registrations.push((union_name, gql_union, variant_objects));
1416                    }
1417                }
1418            }
1419
1420            // Register union types and variant objects
1421            for (_, union_type, variant_objs) in union_registrations {
1422                c.extra_objects.extend(variant_objs);
1423                // Unions need to be registered differently — store in extra_objects
1424                // by wrapping in a dummy. Actually, we need to register unions via builder.
1425                // For now, we'll store them in a new field. Let's use the enums vec hack:
1426                // Actually, async-graphql dynamic schema registers unions the same way.
1427                // We need to pass them up. Let's add a unions vec to DimCollector.
1428                // For now, store union as Object (it won't work). We need to refactor.
1429                // Let me add unions to CubeTypes.
1430                c.extra_unions.push(union_type);
1431            }
1432
1433            // Build the list resolver: zip parallel array columns into element objects
1434            let child_columns: Vec<(String, String)> = children.iter()
1435                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1436                .collect();
1437            let element_type_name_clone = element_type_name.clone();
1438
1439            let mut array_field = Field::new(
1440                graphql_name,
1441                TypeRef::named_nn_list_nn(&element_type_name),
1442                move |ctx| {
1443                    let cols = child_columns.clone();
1444                    let _etype = element_type_name_clone.clone();
1445                    FieldFuture::new(async move {
1446                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1447                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1448                            .map(|(gql_name, col)| {
1449                                let arr = row.get(col)
1450                                    .and_then(|v| v.as_array())
1451                                    .cloned()
1452                                    .unwrap_or_default();
1453                                (gql_name.as_str(), arr)
1454                            })
1455                            .collect();
1456
1457                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1458                        let mut elements = Vec::with_capacity(len);
1459                        for i in 0..len {
1460                            let mut elem = RowMap::new();
1461                            for (gql_name, arr) in &arrays {
1462                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1463                                elem.insert(gql_name.to_string(), val);
1464                            }
1465                            // Also store by column name for Union resolvers
1466                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1467                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1468                                elem.insert(col.clone(), val);
1469                            }
1470                            elements.push(FieldValue::owned_any(elem));
1471                        }
1472                        Ok(Some(FieldValue::list(elements)))
1473                    })
1474                },
1475            );
1476            if let Some(desc) = description {
1477                array_field = array_field.description(desc);
1478            }
1479            c.record_fields.push(array_field);
1480
1481            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1482            // parser can find the `includes` key (compiler/filter.rs expects
1483            // `{ includes: [{ Name: { is: "..." } }] }`).
1484            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1485            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1486                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1487                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1488                    .description("Match rows where at least one array element satisfies all conditions"));
1489            c.extra_inputs.push(wrapper_filter);
1490
1491            c.filter_fields.push(InputValue::new(
1492                graphql_name,
1493                TypeRef::named(&wrapper_filter_name),
1494            ));
1495
1496            c.extra_objects.push(element_obj);
1497            c.extra_inputs.push(includes_filter);
1498        }
1499    }
1500}
1501
1502/// Resolve the GraphQL Union __typename from a discriminator column value.
1503///
1504/// Walks variants in order; the first whose `source_type_names` contains
1505/// `type_str` wins.  If none match, the last variant is used as fallback
1506/// (conventionally the "catch-all" variant such as JSON).
1507fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1508    for v in variants {
1509        if v.source_type_names.iter().any(|s| s == type_str) {
1510            return v.type_name.clone();
1511        }
1512    }
1513    variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1514}
1515
1516fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1517    match dt {
1518        DimType::String | DimType::DateTime | DimType::Decimal => TypeRef::named(TypeRef::STRING),
1519        DimType::Int => TypeRef::named(TypeRef::INT),
1520        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1521        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1522    }
1523}
1524
1525fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1526    match dt {
1527        DimType::String => "StringFilter",
1528        DimType::Int => "IntFilter",
1529        DimType::Float => "FloatFilter",
1530        DimType::Decimal => "DecimalFilter",
1531        DimType::DateTime => "DateTimeFilter",
1532        DimType::Bool => "BoolFilter",
1533    }
1534}
1535
1536pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1537    match v {
1538        serde_json::Value::Null => Value::Null,
1539        serde_json::Value::Bool(b) => Value::from(b),
1540        serde_json::Value::Number(n) => {
1541            if let Some(i) = n.as_i64() { Value::from(i) }
1542            else if let Some(f) = n.as_f64() { Value::from(f) }
1543            else { Value::from(n.to_string()) }
1544        }
1545        serde_json::Value::String(s) => Value::from(s),
1546        _ => Value::from(v.to_string()),
1547    }
1548}
1549
1550/// Build a JoinExpr from a JoinDef and target cube definition.
1551/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1552fn build_join_expr(
1553    jd: &crate::cube::definition::JoinDef,
1554    target_cube: &CubeDefinition,
1555    sub_field: &async_graphql::SelectionField<'_>,
1556    network: &str,
1557    join_idx: usize,
1558) -> JoinExpr {
1559    let target_flat = target_cube.flat_dimensions();
1560    let target_table = target_cube.table_for_chain(network);
1561
1562    let mut requested_paths = HashSet::new();
1563    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1564
1565    let mut selects: Vec<SelectExpr> = target_flat.iter()
1566        .filter(|(path, _)| requested_paths.contains(path))
1567        .map(|(_, dim)| SelectExpr::Column {
1568            column: dim.column.clone(),
1569            alias: None,
1570        })
1571        .collect();
1572
1573    if selects.is_empty() {
1574        selects = target_flat.iter()
1575            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1576            .collect();
1577    }
1578
1579    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1580
1581    let group_by = if is_aggregate {
1582        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1583        for sel in &selects {
1584            if let SelectExpr::Column { column, .. } = sel {
1585                if !column.contains('(') && !gb.contains(column) {
1586                    gb.push(column.clone());
1587                }
1588            }
1589        }
1590        gb
1591    } else {
1592        vec![]
1593    };
1594
1595    JoinExpr {
1596        schema: target_cube.schema.clone(),
1597        table: target_table,
1598        alias: format!("_j{}", join_idx),
1599        conditions: jd.conditions.clone(),
1600        selects,
1601        group_by,
1602        use_final: target_cube.use_final,
1603        is_aggregate,
1604        target_cube: jd.target_cube.clone(),
1605        join_field: sub_field.name().to_string(),
1606        join_type: jd.join_type.clone(),
1607    }
1608}
1609
1610/// Convert a ClickHouse DateTime value to ISO 8601 format.
1611/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1612fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1613    match v {
1614        serde_json::Value::String(s) => {
1615            let iso = if s.contains('T') {
1616                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1617            } else {
1618                let replaced = s.replacen(' ', "T", 1);
1619                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1620            };
1621            Value::from(iso)
1622        }
1623        other => json_to_gql_value(other),
1624    }
1625}