Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59}
60
61/// Callback to transform the resolved table name based on `ChainContext`.
62/// Called after `resolve_table` picks the base table, before SQL compilation.
63/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
64pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66/// Configuration for supported networks (chains) and optional stats collection.
67pub struct SchemaConfig {
68    pub chain_groups: Vec<ChainGroupConfig>,
69    pub root_query_name: String,
70    /// Optional callback invoked after each cube query with execution metadata.
71    /// Used by application layer for billing, observability, etc.
72    pub stats_callback: Option<StatsCallback>,
73    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
74    pub table_name_transform: Option<TableNameTransform>,
75    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
76    pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80    fn default() -> Self {
81        Self {
82            chain_groups: vec![
83                ChainGroupConfig {
84                    name: "EVM".to_string(),
85                    group: ChainGroup::Evm,
86                    networks: vec!["eth".into(), "bsc".into()],
87                    has_network_arg: true,
88                    extra_args: vec![],
89                },
90                ChainGroupConfig {
91                    name: "Solana".to_string(),
92                    group: ChainGroup::Solana,
93                    networks: vec!["sol".into()],
94                    has_network_arg: false,
95                    extra_args: vec![],
96                },
97                ChainGroupConfig {
98                    name: "Trading".to_string(),
99                    group: ChainGroup::Trading,
100                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101                    has_network_arg: false,
102                    extra_args: vec![],
103                },
104            ],
105            root_query_name: "ChainStream".to_string(),
106            stats_callback: None,
107            table_name_transform: None,
108            extra_types: vec![],
109        }
110    }
111}
112
113/// Stored in the parent value of chain wrapper types so cube resolvers can
114/// retrieve the active chain without a `network` argument on themselves.
115/// Service-layer extra_args values are available in the `extra` map.
116pub struct ChainContext {
117    pub network: String,
118    pub extra: HashMap<String, String>,
119}
120
121/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
122///
123/// Schema shape (Bitquery-aligned):
124/// ```graphql
125/// type Query {
126///   EVM(network: EVMNetwork!): EVM!
127///   Solana: Solana!
128///   Trading: Trading!
129///   _cubeMetadata: String!
130/// }
131/// ```
132pub fn build_schema(
133    registry: CubeRegistry,
134    dialect: Arc<dyn SqlDialect>,
135    executor: QueryExecutor,
136    config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138    let mut builder = Schema::build("Query", None, None);
139
140    // --- shared input / enum types -------------------------------------------
141    builder = builder.register(filter_types::build_limit_input());
142    builder = builder.register(
143        InputObject::new("LimitByInput")
144            .description("Limit results per group (similar to ClickHouse LIMIT BY)")
145            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
146                .description("Comma-separated dimension names to group by"))
147            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
148                .description("Maximum rows per group"))
149            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
150                .description("Rows to skip per group")),
151    );
152    builder = builder.register(
153        Enum::new("OrderDirection")
154            .description("Sort direction")
155            .item(EnumItem::new("ASC").description("Ascending"))
156            .item(EnumItem::new("DESC").description("Descending")),
157    );
158    for input in filter_types::build_filter_primitives() {
159        builder = builder.register(input);
160    }
161    builder = builder.register(
162        Enum::new("TimeUnit")
163            .description("Time unit for interval bucketing")
164            .item(EnumItem::new("seconds"))
165            .item(EnumItem::new("minutes"))
166            .item(EnumItem::new("hours"))
167            .item(EnumItem::new("days"))
168            .item(EnumItem::new("weeks"))
169            .item(EnumItem::new("months")),
170    );
171    builder = builder.register(
172        InputObject::new("TimeIntervalInput")
173            .description("Time bucketing interval for DateTime dimensions")
174            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
175            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
176    );
177    builder = builder.register(
178        InputObject::new("DimSelectWhere")
179            .description("Post-aggregation filter for dimension values (HAVING clause)")
180            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
181            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
182            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
183            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
184            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
185            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
186    );
187
188    for extra_enum in config.extra_types {
189        builder = builder.register(extra_enum);
190    }
191
192    // --- per-group network enums ---------------------------------------------
193    for grp in &config.chain_groups {
194        if grp.has_network_arg {
195            let enum_name = format!("{}Network", grp.name);
196            let mut net_enum = Enum::new(&enum_name)
197                .description(format!("{} network selector", grp.name));
198            for net in &grp.networks {
199                net_enum = net_enum.item(EnumItem::new(net));
200            }
201            builder = builder.register(net_enum);
202        }
203    }
204
205    // --- register cube types once (shared across chain groups) ----------------
206    let mut registered_cubes: HashSet<String> = HashSet::new();
207    for cube in registry.cubes() {
208        if registered_cubes.contains(&cube.name) { continue; }
209        registered_cubes.insert(cube.name.clone());
210
211        let types = build_cube_types(cube);
212        for obj in types.objects { builder = builder.register(obj); }
213        for inp in types.inputs { builder = builder.register(inp); }
214        for en in types.enums { builder = builder.register(en); }
215        for un in types.unions { builder = builder.register(un); }
216    }
217
218    // --- build chain wrapper types + Query fields ----------------------------
219    let mut query = Object::new("Query");
220
221    for grp in &config.chain_groups {
222        let wrapper_type_name = grp.name.clone();
223
224        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
225        // cube fields that belong to this group.
226        let mut wrapper_obj = Object::new(&wrapper_type_name);
227
228        for cube in registry.cubes() {
229            if !cube.chain_groups.contains(&grp.group) { continue; }
230
231            let cube_field = build_cube_field(
232                cube,
233                dialect.clone(),
234                executor.clone(),
235                config.stats_callback.clone(),
236            );
237            wrapper_obj = wrapper_obj.field(cube_field);
238        }
239        builder = builder.register(wrapper_obj);
240
241        // Wrapper resolver on Query — stores ChainContext for child resolvers.
242        let default_network = grp.networks.first().cloned().unwrap_or_default();
243        let has_net_arg = grp.has_network_arg;
244        let net_enum_name = format!("{}Network", grp.name);
245        let wrapper_type_for_resolver = wrapper_type_name.clone();
246        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
247
248        let mut wrapper_field = Field::new(
249            &wrapper_type_name,
250            TypeRef::named_nn(&wrapper_type_name),
251            move |ctx| {
252                let default = default_network.clone();
253                let has_arg = has_net_arg;
254                let arg_names = extra_arg_names.clone();
255                FieldFuture::new(async move {
256                    let network = if has_arg {
257                        ctx.args.try_get("network")
258                            .ok()
259                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
260                            .unwrap_or(default)
261                    } else {
262                        default
263                    };
264                    let mut extra = HashMap::new();
265                    for arg_name in &arg_names {
266                        if let Ok(val) = ctx.args.try_get(arg_name) {
267                            let resolved = val.enum_name().ok().map(|s| s.to_string())
268                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
269                                .or_else(|| val.string().ok().map(|s| s.to_string()));
270                            if let Some(v) = resolved {
271                                extra.insert(arg_name.clone(), v);
272                            }
273                        }
274                    }
275                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
276                })
277            },
278        );
279
280        if grp.has_network_arg {
281            wrapper_field = wrapper_field.argument(
282                InputValue::new("network", TypeRef::named_nn(&net_enum_name))
283                    .description(format!("{} network to query", wrapper_type_for_resolver)),
284            );
285        }
286        for wa in &grp.extra_args {
287            wrapper_field = wrapper_field.argument(
288                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
289                    .description(&wa.description),
290            );
291        }
292
293        query = query.field(wrapper_field);
294    }
295
296    // --- _cubeMetadata -------------------------------------------------------
297    let metadata_registry = Arc::new(registry.clone());
298    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
299        .map(|g| (g.name.clone(), g.group.clone()))
300        .collect();
301    let metadata_field = Field::new(
302        "_cubeMetadata",
303        TypeRef::named_nn(TypeRef::STRING),
304        move |_ctx| {
305            let reg = metadata_registry.clone();
306            let groups = metadata_groups.clone();
307            FieldFuture::new(async move {
308                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
309                for (group_name, group_enum) in &groups {
310                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
311                        .filter(|c| c.chain_groups.contains(group_enum))
312                        .map(serialize_cube_metadata)
313                        .collect();
314                    group_metadata.push(serde_json::json!({
315                        "group": group_name,
316                        "cubes": cubes_in_group,
317                    }));
318                }
319                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
320                Ok(Some(FieldValue::value(Value::from(json))))
321            })
322        },
323    )
324    .description("Internal: returns JSON metadata about all cubes grouped by chain");
325    query = query.field(metadata_field);
326
327    builder = builder.register(query);
328    builder = builder.data(registry);
329
330    if let Some(transform) = config.table_name_transform.clone() {
331        builder = builder.data(transform);
332    }
333
334    builder.finish()
335}
336
337fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
338    serde_json::json!({
339        "name": cube.name,
340        "description": cube.description,
341        "schema": cube.schema,
342        "tablePattern": cube.table_pattern,
343        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
344        "metrics": cube.metrics.iter().map(|m| {
345            let mut obj = serde_json::json!({
346                "name": m.name,
347                "returnType": format!("{:?}", m.return_type),
348            });
349            if let Some(ref tmpl) = m.expression_template {
350                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
351            }
352            if let Some(ref desc) = m.description {
353                obj["description"] = serde_json::Value::String(desc.clone());
354            }
355            obj
356        }).collect::<Vec<_>>(),
357        "selectors": cube.selectors.iter().map(|s| {
358            serde_json::json!({
359                "name": s.graphql_name,
360                "column": s.column,
361                "type": format!("{:?}", s.dim_type),
362            })
363        }).collect::<Vec<_>>(),
364        "dimensions": serialize_dims(&cube.dimensions),
365        "joins": cube.joins.iter().map(|j| {
366            serde_json::json!({
367                "field": j.field_name,
368                "target": j.target_cube,
369                "joinType": format!("{:?}", j.join_type),
370            })
371        }).collect::<Vec<_>>(),
372        "tableRoutes": cube.table_routes.iter().map(|r| {
373            serde_json::json!({
374                "schema": r.schema,
375                "tablePattern": r.table_pattern,
376                "availableColumns": r.available_columns,
377                "priority": r.priority,
378            })
379        }).collect::<Vec<_>>(),
380        "defaultLimit": cube.default_limit,
381        "maxLimit": cube.max_limit,
382    })
383}
384
385/// Build a single cube Field that reads `network` from the parent ChainContext.
386fn build_cube_field(
387    cube: &CubeDefinition,
388    dialect: Arc<dyn SqlDialect>,
389    executor: QueryExecutor,
390    stats_cb: Option<StatsCallback>,
391) -> Field {
392    let cube_name = cube.name.clone();
393    let orderby_input_name = format!("{}OrderByInput", cube.name);
394    let cube_description = cube.description.clone();
395
396    let mut field = Field::new(
397        &cube.name,
398        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
399        move |ctx| {
400            let cube_name = cube_name.clone();
401            let dialect = dialect.clone();
402            let executor = executor.clone();
403            let stats_cb = stats_cb.clone();
404            FieldFuture::new(async move {
405                let registry = ctx.ctx.data::<CubeRegistry>()?;
406
407                let chain_ctx = ctx.parent_value
408                    .try_downcast_ref::<ChainContext>().ok();
409                let network = chain_ctx
410                    .map(|c| c.network.as_str())
411                    .unwrap_or("sol");
412
413                let cube_def = registry.get(&cube_name).ok_or_else(|| {
414                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
415                })?;
416
417                let metric_requests = extract_metric_requests(&ctx, cube_def);
418                let quantile_requests = extract_quantile_requests(&ctx);
419                let calculate_requests = extract_calculate_requests(&ctx);
420                let field_aliases = extract_field_aliases(&ctx, cube_def);
421                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
422                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
423                let requested = extract_requested_fields(&ctx, cube_def);
424                let mut ir = compiler::parser::parse_cube_query(
425                    cube_def,
426                    network,
427                    &ctx.args,
428                    &metric_requests,
429                    &quantile_requests,
430                    &calculate_requests,
431                    &field_aliases,
432                    &dim_agg_requests,
433                    &time_intervals,
434                    Some(requested),
435                )?;
436
437                let mut join_idx = 0usize;
438                for sub_field in ctx.ctx.field().selection_set() {
439                    let fname = sub_field.name().to_string();
440                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
441                    if let Some(jd) = join_def {
442                        if let Some(target_cube) = registry.get(&jd.target_cube) {
443                            let join_expr = build_join_expr(
444                                jd, target_cube, &sub_field, network, join_idx,
445                            );
446                            ir.joins.push(join_expr);
447                            join_idx += 1;
448                        }
449                    }
450                }
451
452                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
453                    if let Some(cctx) = chain_ctx {
454                        ir.table = transform(&ir.table, cctx);
455                    }
456                }
457
458                let validated = compiler::validator::validate(ir)?;
459                let result = dialect.compile(&validated);
460                let sql = result.sql;
461                let bindings = result.bindings;
462
463                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
464                    async_graphql::Error::new(format!("Query execution failed: {e}"))
465                })?;
466
467                let rows = if result.alias_remap.is_empty() {
468                    rows
469                } else {
470                    rows.into_iter().map(|mut row| {
471                        for (alias, original) in &result.alias_remap {
472                            if let Some(val) = row.shift_remove(alias) {
473                                row.entry(original.clone()).or_insert(val);
474                            }
475                        }
476                        row
477                    }).collect()
478                };
479
480                let rows: Vec<RowMap> = if validated.joins.is_empty() {
481                    rows
482                } else {
483                    rows.into_iter().map(|mut row| {
484                        for join in &validated.joins {
485                            let prefix = format!("{}.", join.alias);
486                            let mut sub_row = RowMap::new();
487                            let keys: Vec<String> = row.keys()
488                                .filter(|k| k.starts_with(&prefix))
489                                .cloned()
490                                .collect();
491                            for key in keys {
492                                if let Some(val) = row.shift_remove(&key) {
493                                    sub_row.insert(key[prefix.len()..].to_string(), val);
494                                }
495                            }
496                            let obj: serde_json::Map<String, serde_json::Value> =
497                                sub_row.into_iter().collect();
498                            row.insert(
499                                join.join_field.clone(),
500                                serde_json::Value::Object(obj),
501                            );
502                        }
503                        row
504                    }).collect()
505                };
506
507                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
508                    .or_else(|| stats_cb.clone());
509                if let Some(cb) = effective_cb {
510                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
511                    cb(stats);
512                }
513
514                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
515                Ok(Some(FieldValue::list(values)))
516            })
517        },
518    );
519    if !cube_description.is_empty() {
520        field = field.description(&cube_description);
521    }
522    field = field
523        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
524            .description("Filter conditions"))
525        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
526            .description("Pagination control"))
527        .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
528            .description("Per-group row limit"))
529        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
530            .description("Sort order (Bitquery-compatible)"));
531
532    for sel in &cube.selectors {
533        let filter_type = dim_type_to_filter_name(&sel.dim_type);
534        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
535            .description(format!("Shorthand filter for {}", sel.graphql_name)));
536    }
537
538    field
539}
540
541fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
542    serde_json::Value::Array(dims.iter().map(|d| match d {
543        DimensionNode::Leaf(dim) => {
544            let mut obj = serde_json::json!({
545                "name": dim.graphql_name,
546                "column": dim.column,
547                "type": format!("{:?}", dim.dim_type),
548            });
549            if let Some(desc) = &dim.description {
550                obj["description"] = serde_json::Value::String(desc.clone());
551            }
552            obj
553        },
554        DimensionNode::Group { graphql_name, description, children } => {
555            let mut obj = serde_json::json!({
556                "name": graphql_name,
557                "children": serialize_dims(children),
558            });
559            if let Some(desc) = description {
560                obj["description"] = serde_json::Value::String(desc.clone());
561            }
562            obj
563        },
564        DimensionNode::Array { graphql_name, description, children } => {
565            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
566                let type_value = match &f.field_type {
567                    crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
568                        "kind": "scalar",
569                        "scalarType": format!("{:?}", dt),
570                    }),
571                    crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
572                        "kind": "union",
573                        "variants": variants.iter().map(|v| serde_json::json!({
574                            "typeName": v.type_name,
575                            "fieldName": v.field_name,
576                            "sourceType": format!("{:?}", v.source_type),
577                        })).collect::<Vec<_>>(),
578                    }),
579                };
580                let mut field_obj = serde_json::json!({
581                    "name": f.graphql_name,
582                    "column": f.column,
583                    "type": type_value,
584                });
585                if let Some(desc) = &f.description {
586                    field_obj["description"] = serde_json::Value::String(desc.clone());
587                }
588                field_obj
589            }).collect();
590            let mut obj = serde_json::json!({
591                "name": graphql_name,
592                "kind": "array",
593                "fields": fields,
594            });
595            if let Some(desc) = description {
596                obj["description"] = serde_json::Value::String(desc.clone());
597            }
598            obj
599        },
600    }).collect())
601}
602
603/// Extract metric requests from the GraphQL selection set by inspecting
604/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
605/// the "count" field in the selection set and extract its `of` argument.
606fn extract_metric_requests(
607    ctx: &async_graphql::dynamic::ResolverContext,
608    cube: &CubeDefinition,
609) -> Vec<compiler::parser::MetricRequest> {
610    let mut requests = Vec::new();
611
612    for sub_field in ctx.ctx.field().selection_set() {
613        let name = sub_field.name();
614        if !cube.has_metric(name) {
615            continue;
616        }
617
618        let args = match sub_field.arguments() {
619            Ok(args) => args,
620            Err(_) => continue,
621        };
622
623        let of_dimension = args
624            .iter()
625            .find(|(k, _)| k.as_str() == "of")
626            .and_then(|(_, v)| match v {
627                async_graphql::Value::Enum(e) => Some(e.to_string()),
628                async_graphql::Value::String(s) => Some(s.clone()),
629                _ => None,
630            })
631            .unwrap_or_else(|| "*".to_string());
632
633        let select_where_value = args
634            .iter()
635            .find(|(k, _)| k.as_str() == "selectWhere")
636            .map(|(_, v)| v.clone());
637
638        let condition_filter = args
639            .iter()
640            .find(|(k, _)| k.as_str() == "if")
641            .and_then(|(_, v)| {
642                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
643                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
644            });
645
646        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
647        requests.push(compiler::parser::MetricRequest {
648            function: name.to_string(),
649            alias: gql_alias,
650            of_dimension,
651            select_where_value,
652            condition_filter,
653        });
654    }
655
656    requests
657}
658
659fn extract_requested_fields(
660    ctx: &async_graphql::dynamic::ResolverContext,
661    cube: &CubeDefinition,
662) -> HashSet<String> {
663    let mut fields = HashSet::new();
664    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
665    fields
666}
667
668fn collect_selection_paths(
669    field: &async_graphql::SelectionField<'_>,
670    prefix: &str,
671    out: &mut HashSet<String>,
672    metrics: &[MetricDef],
673) {
674    for sub in field.selection_set() {
675        let name = sub.name();
676        if metrics.iter().any(|m| m.name == name) {
677            continue;
678        }
679        let path = if prefix.is_empty() {
680            name.to_string()
681        } else {
682            format!("{prefix}_{name}")
683        };
684        let has_children = sub.selection_set().next().is_some();
685        if has_children {
686            collect_selection_paths(&sub, &path, out, metrics);
687        } else {
688            out.insert(path);
689        }
690    }
691}
692
693/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
694/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
695pub type FieldAliasMap = Vec<(String, String)>;
696
697/// Describes a `quantile(of: ..., level: ...)` request.
698pub struct QuantileRequest {
699    pub alias: String,
700    pub of_dimension: String,
701    pub level: f64,
702}
703
704/// Describes a `calculate(expression: "...")` request.
705pub struct CalculateRequest {
706    pub alias: String,
707    pub expression: String,
708}
709
710/// Describes a time interval bucketing request.
711/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
712pub struct TimeIntervalRequest {
713    pub field_path: String,
714    pub graphql_alias: String,
715    pub column: String,
716    pub unit: String,
717    pub count: i64,
718}
719
720/// Describes a dimension-level aggregation request extracted from the selection set.
721/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
722pub struct DimAggRequest {
723    pub field_path: String,
724    pub graphql_alias: String,
725    pub value_column: String,
726    pub agg_type: DimAggType,
727    pub compare_column: String,
728    pub condition_filter: Option<FilterNode>,
729    pub select_where_value: Option<async_graphql::Value>,
730}
731
732fn extract_quantile_requests(
733    ctx: &async_graphql::dynamic::ResolverContext,
734) -> Vec<QuantileRequest> {
735    let mut requests = Vec::new();
736    for sub_field in ctx.ctx.field().selection_set() {
737        if sub_field.name() != "quantile" { continue; }
738        let args = match sub_field.arguments() {
739            Ok(a) => a,
740            Err(_) => continue,
741        };
742        let of_dim = args.iter()
743            .find(|(k, _)| k.as_str() == "of")
744            .and_then(|(_, v)| match v {
745                async_graphql::Value::Enum(e) => Some(e.to_string()),
746                async_graphql::Value::String(s) => Some(s.clone()),
747                _ => None,
748            })
749            .unwrap_or_else(|| "*".to_string());
750        let level = args.iter()
751            .find(|(k, _)| k.as_str() == "level")
752            .and_then(|(_, v)| match v {
753                async_graphql::Value::Number(n) => n.as_f64(),
754                _ => None,
755            })
756            .unwrap_or(0.5);
757        let alias = sub_field.alias().unwrap_or("quantile").to_string();
758        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
759    }
760    requests
761}
762
763fn extract_field_aliases(
764    ctx: &async_graphql::dynamic::ResolverContext,
765    cube: &CubeDefinition,
766) -> FieldAliasMap {
767    let flat = cube.flat_dimensions();
768    let mut aliases = Vec::new();
769    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
770    aliases
771}
772
773fn collect_field_aliases(
774    field: &async_graphql::SelectionField<'_>,
775    prefix: &str,
776    flat: &[(String, crate::cube::definition::Dimension)],
777    metrics: &[MetricDef],
778    out: &mut FieldAliasMap,
779) {
780    for sub in field.selection_set() {
781        let name = sub.name();
782        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
783            continue;
784        }
785        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
786        let has_children = sub.selection_set().next().is_some();
787        if has_children {
788            collect_field_aliases(&sub, &path, flat, metrics, out);
789        } else if let Some(alias) = sub.alias() {
790            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
791                let alias_path = if prefix.is_empty() {
792                    alias.to_string()
793                } else {
794                    format!("{prefix}_{alias}")
795                };
796                out.push((alias_path, dim.column.clone()));
797            }
798        }
799    }
800}
801
802fn extract_calculate_requests(
803    ctx: &async_graphql::dynamic::ResolverContext,
804) -> Vec<CalculateRequest> {
805    let mut requests = Vec::new();
806    for sub_field in ctx.ctx.field().selection_set() {
807        if sub_field.name() != "calculate" { continue; }
808        let args = match sub_field.arguments() {
809            Ok(a) => a,
810            Err(_) => continue,
811        };
812        let expression = args.iter()
813            .find(|(k, _)| k.as_str() == "expression")
814            .and_then(|(_, v)| match v {
815                async_graphql::Value::String(s) => Some(s.clone()),
816                _ => None,
817            });
818        if let Some(expr) = expression {
819            let alias = sub_field.alias().unwrap_or("calculate").to_string();
820            requests.push(CalculateRequest { alias, expression: expr });
821        }
822    }
823    requests
824}
825
826fn extract_dim_agg_requests(
827    ctx: &async_graphql::dynamic::ResolverContext,
828    cube: &CubeDefinition,
829) -> Vec<DimAggRequest> {
830    let flat = cube.flat_dimensions();
831    let mut requests = Vec::new();
832    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
833    requests
834}
835
836fn collect_dim_agg_paths(
837    field: &async_graphql::SelectionField<'_>,
838    prefix: &str,
839    flat: &[(String, crate::cube::definition::Dimension)],
840    metrics: &[MetricDef],
841    cube: &CubeDefinition,
842    out: &mut Vec<DimAggRequest>,
843) {
844    for sub in field.selection_set() {
845        let name = sub.name();
846        if metrics.iter().any(|m| m.name == name) {
847            continue;
848        }
849        let path = if prefix.is_empty() {
850            name.to_string()
851        } else {
852            format!("{prefix}_{name}")
853        };
854        let has_children = sub.selection_set().next().is_some();
855        if has_children {
856            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
857        } else {
858            let args = match sub.arguments() {
859                Ok(a) => a,
860                Err(_) => continue,
861            };
862            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
863            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
864
865            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
866                let cp = match v {
867                    async_graphql::Value::Enum(e) => e.to_string(),
868                    async_graphql::Value::String(s) => s.clone(),
869                    _ => continue,
870                };
871                (DimAggType::ArgMax, cp)
872            } else if let Some((_, v)) = min_val {
873                let cp = match v {
874                    async_graphql::Value::Enum(e) => e.to_string(),
875                    async_graphql::Value::String(s) => s.clone(),
876                    _ => continue,
877                };
878                (DimAggType::ArgMin, cp)
879            } else {
880                continue;
881            };
882
883            let value_column = flat.iter()
884                .find(|(p, _)| p == &path)
885                .map(|(_, dim)| dim.column.clone());
886            let compare_column = flat.iter()
887                .find(|(p, _)| p == &compare_path)
888                .map(|(_, dim)| dim.column.clone());
889
890            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
891                let condition_filter = args.iter()
892                    .find(|(k, _)| k.as_str() == "if")
893                    .and_then(|(_, v)| {
894                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
895                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
896                    });
897                let select_where_value = args.iter()
898                    .find(|(k, _)| k.as_str() == "selectWhere")
899                    .map(|(_, v)| v.clone());
900
901                let gql_alias = sub.alias()
902                    .map(|a| a.to_string())
903                    .unwrap_or_else(|| path.clone());
904                out.push(DimAggRequest {
905                    field_path: path,
906                    graphql_alias: gql_alias,
907                    value_column: vc,
908                    agg_type,
909                    compare_column: cc,
910                    condition_filter,
911                    select_where_value,
912                });
913            }
914        }
915    }
916}
917
918fn extract_time_interval_requests(
919    ctx: &async_graphql::dynamic::ResolverContext,
920    cube: &CubeDefinition,
921) -> Vec<TimeIntervalRequest> {
922    let flat = cube.flat_dimensions();
923    let mut requests = Vec::new();
924    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
925    requests
926}
927
928fn collect_time_interval_paths(
929    field: &async_graphql::SelectionField<'_>,
930    prefix: &str,
931    flat: &[(String, crate::cube::definition::Dimension)],
932    metrics: &[MetricDef],
933    out: &mut Vec<TimeIntervalRequest>,
934) {
935    for sub in field.selection_set() {
936        let name = sub.name();
937        if metrics.iter().any(|m| m.name == name) { continue; }
938        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
939        let has_children = sub.selection_set().next().is_some();
940        if has_children {
941            collect_time_interval_paths(&sub, &path, flat, metrics, out);
942        } else {
943            let args = match sub.arguments() {
944                Ok(a) => a,
945                Err(_) => continue,
946            };
947            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
948            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
949                let unit = obj.get("in")
950                    .and_then(|v| match v {
951                        async_graphql::Value::Enum(e) => Some(e.to_string()),
952                        async_graphql::Value::String(s) => Some(s.clone()),
953                        _ => None,
954                    });
955                let count = obj.get("count")
956                    .and_then(|v| match v {
957                        async_graphql::Value::Number(n) => n.as_i64(),
958                        _ => None,
959                    });
960                if let (Some(unit), Some(count)) = (unit, count) {
961                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
962                        let gql_alias = sub.alias().unwrap_or(name).to_string();
963                        out.push(TimeIntervalRequest {
964                            field_path: path,
965                            graphql_alias: gql_alias,
966                            column: dim.column.clone(),
967                            unit,
968                            count,
969                        });
970                    }
971                }
972            }
973        }
974    }
975}
976
977// ---------------------------------------------------------------------------
978// Per-Cube GraphQL type generation
979// ---------------------------------------------------------------------------
980
981struct CubeTypes {
982    objects: Vec<Object>,
983    inputs: Vec<InputObject>,
984    enums: Vec<Enum>,
985    unions: Vec<Union>,
986}
987
988fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
989    let record_name = format!("{}Record", cube.name);
990    let filter_name = format!("{}Filter", cube.name);
991    let compare_enum_name = format!("{}CompareFields", cube.name);
992
993    let flat_dims = cube.flat_dimensions();
994
995    let mut compare_enum = Enum::new(&compare_enum_name)
996        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
997    for (path, _) in &flat_dims {
998        compare_enum = compare_enum.item(EnumItem::new(path));
999    }
1000
1001    let mut record_fields: Vec<Field> = Vec::new();
1002    let mut filter_fields: Vec<InputValue> = Vec::new();
1003    let mut extra_objects: Vec<Object> = Vec::new();
1004    let mut extra_inputs: Vec<InputObject> = Vec::new();
1005    let mut extra_unions: Vec<Union> = Vec::new();
1006
1007    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1008        .description("OR combinator — matches if any sub-filter matches"));
1009
1010    {
1011        let mut collector = DimCollector {
1012            cube_name: &cube.name,
1013            compare_enum_name: &compare_enum_name,
1014            filter_name: &filter_name,
1015            record_fields: &mut record_fields,
1016            filter_fields: &mut filter_fields,
1017            extra_objects: &mut extra_objects,
1018            extra_inputs: &mut extra_inputs,
1019            extra_unions: &mut extra_unions,
1020        };
1021        for node in &cube.dimensions {
1022            collect_dimension_types(node, "", &mut collector);
1023        }
1024    }
1025
1026    let mut metric_enums: Vec<Enum> = Vec::new();
1027    let builtin_descs: std::collections::HashMap<&str, &str> = [
1028        ("count", "Count of rows or distinct values"),
1029        ("sum", "Sum of values"),
1030        ("avg", "Average of values"),
1031        ("min", "Minimum value"),
1032        ("max", "Maximum value"),
1033        ("uniq", "Count of unique (distinct) values"),
1034    ].into_iter().collect();
1035
1036    for metric in &cube.metrics {
1037        let metric_name = &metric.name;
1038        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1039
1040        if metric.supports_where {
1041            extra_inputs.push(
1042                InputObject::new(&select_where_name)
1043                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1044                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1045                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1046                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1047                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1048                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1049            );
1050        }
1051
1052        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1053        let mut of_enum = Enum::new(&of_enum_name)
1054            .description(format!("Dimension to apply {} aggregation on", metric_name));
1055        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1056        metric_enums.push(of_enum);
1057
1058        let metric_name_clone = metric_name.clone();
1059        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1060        let metric_desc = metric.description.as_deref()
1061            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1062            .unwrap_or("Aggregate metric");
1063
1064        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1065            let default_name = metric_name_clone.clone();
1066            FieldFuture::new(async move {
1067                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1068                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1069                let key = metric_key(alias);
1070                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1071                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1072            })
1073        })
1074        .description(metric_desc)
1075        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1076            .description("Dimension to aggregate on (default: all rows)"));
1077
1078        if metric.supports_where {
1079            metric_field = metric_field
1080                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1081                    .description("Post-aggregation filter (HAVING)"))
1082                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1083                    .description("Conditional filter for this metric"));
1084        }
1085
1086        record_fields.push(metric_field);
1087    }
1088
1089    // `quantile(of: ..., level: ...)` — percentile computation
1090    {
1091        let of_enum_name = format!("{}_quantile_Of", cube.name);
1092        let mut of_enum = Enum::new(&of_enum_name)
1093            .description(format!("Dimension to apply quantile on for {}", cube.name));
1094        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1095        metric_enums.push(of_enum);
1096
1097        let of_enum_for_closure = of_enum_name.clone();
1098        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1099            FieldFuture::new(async move {
1100                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1101                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1102                let key = metric_key(alias);
1103                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1104                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1105            })
1106        })
1107        .description("Compute a quantile (percentile) of a dimension")
1108        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1109            .description("Dimension to compute quantile on"))
1110        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1111            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1112        record_fields.push(quantile_field);
1113    }
1114
1115    // `calculate(expression: "...")` — runtime expression computation
1116    {
1117        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1118            FieldFuture::new(async move {
1119                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1120                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1121                let key = metric_key(alias);
1122                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1123                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1124            })
1125        })
1126        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1127        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1128            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1129        record_fields.push(calculate_field);
1130    }
1131
1132    // Add join fields: joinXxx returns the target cube's Record type
1133    for jd in &cube.joins {
1134        let target_record_name = format!("{}Record", jd.target_cube);
1135        let field_name_owned = jd.field_name.clone();
1136        let mut join_field = Field::new(
1137            &jd.field_name,
1138            TypeRef::named(&target_record_name),
1139            move |ctx| {
1140                let field_name = field_name_owned.clone();
1141                FieldFuture::new(async move {
1142                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1143                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1144                        let sub_row: RowMap = obj.iter()
1145                            .map(|(k, v)| (k.clone(), v.clone()))
1146                            .collect();
1147                        Ok(Some(FieldValue::owned_any(sub_row)))
1148                    } else {
1149                        Ok(Some(FieldValue::value(Value::Null)))
1150                    }
1151                })
1152            },
1153        );
1154        if let Some(desc) = &jd.description {
1155            join_field = join_field.description(desc);
1156        }
1157        record_fields.push(join_field);
1158    }
1159
1160    let mut record = Object::new(&record_name);
1161    for f in record_fields { record = record.field(f); }
1162
1163    let mut filter = InputObject::new(&filter_name)
1164        .description(format!("Filter conditions for {} query", cube.name));
1165    for f in filter_fields { filter = filter.field(f); }
1166
1167    let orderby_input_name = format!("{}OrderByInput", cube.name);
1168    let orderby_input = InputObject::new(&orderby_input_name)
1169        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1170        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1171            .description("Sort descending by this field"))
1172        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1173            .description("Sort ascending by this field"))
1174        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1175            .description("Sort descending by computed/aggregated field name"))
1176        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1177            .description("Sort ascending by computed/aggregated field name"));
1178
1179    let mut objects = vec![record]; objects.extend(extra_objects);
1180    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1181    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1182
1183    CubeTypes { objects, inputs, enums, unions: extra_unions }
1184}
1185
1186struct DimCollector<'a> {
1187    cube_name: &'a str,
1188    compare_enum_name: &'a str,
1189    filter_name: &'a str,
1190    record_fields: &'a mut Vec<Field>,
1191    filter_fields: &'a mut Vec<InputValue>,
1192    extra_objects: &'a mut Vec<Object>,
1193    extra_inputs: &'a mut Vec<InputObject>,
1194    extra_unions: &'a mut Vec<Union>,
1195}
1196
1197fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1198    match node {
1199        DimensionNode::Leaf(dim) => {
1200            let col = dim.column.clone();
1201            let is_datetime = dim.dim_type == DimType::DateTime;
1202            let compare_enum = c.compare_enum_name.to_string();
1203            let cube_filter = c.filter_name.to_string();
1204            let mut leaf_field = Field::new(
1205                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1206                move |ctx| {
1207                    let col = col.clone();
1208                    FieldFuture::new(async move {
1209                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1210                        let has_interval = ctx.args.try_get("interval").is_ok();
1211                        let has_max = ctx.args.try_get("maximum").is_ok();
1212                        let has_min = ctx.args.try_get("minimum").is_ok();
1213                        let key = if has_interval || has_max || has_min {
1214                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1215                            dim_agg_key(name)
1216                        } else {
1217                            col.clone()
1218                        };
1219                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1220                        let gql_val = if is_datetime {
1221                            json_to_gql_datetime(val)
1222                        } else {
1223                            json_to_gql_value(val)
1224                        };
1225                        Ok(Some(FieldValue::value(gql_val)))
1226                    })
1227                },
1228            );
1229            if let Some(desc) = &dim.description {
1230                leaf_field = leaf_field.description(desc);
1231            }
1232            leaf_field = leaf_field
1233                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1234                    .description("Return value from row where compare field is maximum (argMax)"))
1235                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1236                    .description("Return value from row where compare field is minimum (argMin)"))
1237                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1238                    .description("Conditional filter for aggregation"))
1239                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1240                    .description("Post-aggregation value filter (HAVING)"));
1241            if is_datetime {
1242                leaf_field = leaf_field
1243                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1244                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1245            }
1246            // Update resolver key: if interval is present, read from the interval expression key
1247            // This is handled by the resolver checking ctx.args for "interval"
1248            c.record_fields.push(leaf_field);
1249            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1250        }
1251        DimensionNode::Group { graphql_name, description, children } => {
1252            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1253            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1254            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1255
1256            let mut child_record_fields: Vec<Field> = Vec::new();
1257            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1258            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1259
1260            let mut child_collector = DimCollector {
1261                cube_name: c.cube_name,
1262                compare_enum_name: c.compare_enum_name,
1263                filter_name: c.filter_name,
1264                record_fields: &mut child_record_fields,
1265                filter_fields: &mut child_filter_fields,
1266                extra_objects: c.extra_objects,
1267                extra_inputs: c.extra_inputs,
1268                extra_unions: c.extra_unions,
1269            };
1270            for child in children {
1271                collect_dimension_types(child, &new_prefix, &mut child_collector);
1272            }
1273
1274            let mut nested_record = Object::new(&nested_record_name);
1275            for f in child_record_fields { nested_record = nested_record.field(f); }
1276
1277            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1278            let mut nested_filter = InputObject::new(&nested_filter_name)
1279                .description(nested_filter_desc);
1280            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1281
1282            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1283                FieldFuture::new(async move {
1284                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1285                    Ok(Some(FieldValue::owned_any(row.clone())))
1286                })
1287            });
1288            if let Some(desc) = description {
1289                group_field = group_field.description(desc);
1290            }
1291            c.record_fields.push(group_field);
1292            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1293            c.extra_objects.push(nested_record);
1294            c.extra_inputs.push(nested_filter);
1295        }
1296        DimensionNode::Array { graphql_name, description, children } => {
1297            let full_path = if prefix.is_empty() {
1298                graphql_name.clone()
1299            } else {
1300                format!("{prefix}_{graphql_name}")
1301            };
1302            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1303            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1304
1305            let mut element_obj = Object::new(&element_type_name);
1306            let mut includes_filter = InputObject::new(&includes_filter_name)
1307                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1308
1309            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1310
1311            for child in children {
1312                match &child.field_type {
1313                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1314                        let col_name = child.column.clone();
1315                        let is_datetime = *dt == DimType::DateTime;
1316                        let mut field = Field::new(
1317                            &child.graphql_name,
1318                            dim_type_to_typeref(dt),
1319                            move |ctx| {
1320                                let col = col_name.clone();
1321                                FieldFuture::new(async move {
1322                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1323                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1324                                    let gql_val = if is_datetime {
1325                                        json_to_gql_datetime(val)
1326                                    } else {
1327                                        json_to_gql_value(val)
1328                                    };
1329                                    Ok(Some(FieldValue::value(gql_val)))
1330                                })
1331                            },
1332                        );
1333                        if let Some(desc) = &child.description {
1334                            field = field.description(desc);
1335                        }
1336                        element_obj = element_obj.field(field);
1337                        includes_filter = includes_filter.field(
1338                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1339                        );
1340                    }
1341                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1342                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1343
1344                        let mut gql_union = Union::new(&union_name);
1345                        let mut variant_objects = Vec::new();
1346
1347                        for v in variants {
1348                            let variant_obj_name = v.type_name.clone();
1349                            let field_name = v.field_name.clone();
1350                            let source_type = v.source_type.clone();
1351                            let type_ref = dim_type_to_typeref(&source_type);
1352
1353                            let val_col = child.column.clone();
1354                            let variant_obj = Object::new(&variant_obj_name)
1355                                .field(Field::new(
1356                                    &field_name,
1357                                    type_ref,
1358                                    move |ctx| {
1359                                        let col = val_col.clone();
1360                                        FieldFuture::new(async move {
1361                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1362                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1363                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1364                                        })
1365                                    },
1366                                ));
1367                            gql_union = gql_union.possible_type(&variant_obj_name);
1368                            variant_objects.push(variant_obj);
1369                        }
1370
1371                        let col_name = child.column.clone();
1372                        let type_col = children.iter()
1373                            .find(|f| f.graphql_name == "Type")
1374                            .map(|f| f.column.clone())
1375                            .unwrap_or_default();
1376                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1377
1378                        let mut field = Field::new(
1379                            &child.graphql_name,
1380                            TypeRef::named(&union_name),
1381                            move |ctx| {
1382                                let col = col_name.clone();
1383                                let tcol = type_col.clone();
1384                                let vars = variants_clone.clone();
1385                                FieldFuture::new(async move {
1386                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1387                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1388                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1389                                        return Ok(None);
1390                                    }
1391                                    let type_str = row.get(&tcol)
1392                                        .and_then(|v| v.as_str())
1393                                        .unwrap_or("");
1394                                    let resolved_type = resolve_union_typename(type_str, &vars);
1395                                    let mut elem = RowMap::new();
1396                                    elem.insert(col.clone(), raw_val);
1397                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1398                                })
1399                            },
1400                        );
1401                        if let Some(desc) = &child.description {
1402                            field = field.description(desc);
1403                        }
1404                        element_obj = element_obj.field(field);
1405
1406                        includes_filter = includes_filter.field(
1407                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1408                        );
1409
1410                        union_registrations.push((union_name, gql_union, variant_objects));
1411                    }
1412                }
1413            }
1414
1415            // Register union types and variant objects
1416            for (_, union_type, variant_objs) in union_registrations {
1417                c.extra_objects.extend(variant_objs);
1418                // Unions need to be registered differently — store in extra_objects
1419                // by wrapping in a dummy. Actually, we need to register unions via builder.
1420                // For now, we'll store them in a new field. Let's use the enums vec hack:
1421                // Actually, async-graphql dynamic schema registers unions the same way.
1422                // We need to pass them up. Let's add a unions vec to DimCollector.
1423                // For now, store union as Object (it won't work). We need to refactor.
1424                // Let me add unions to CubeTypes.
1425                c.extra_unions.push(union_type);
1426            }
1427
1428            // Build the list resolver: zip parallel array columns into element objects
1429            let child_columns: Vec<(String, String)> = children.iter()
1430                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1431                .collect();
1432            let element_type_name_clone = element_type_name.clone();
1433
1434            let mut array_field = Field::new(
1435                graphql_name,
1436                TypeRef::named_nn_list_nn(&element_type_name),
1437                move |ctx| {
1438                    let cols = child_columns.clone();
1439                    let _etype = element_type_name_clone.clone();
1440                    FieldFuture::new(async move {
1441                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1442                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1443                            .map(|(gql_name, col)| {
1444                                let arr = row.get(col)
1445                                    .and_then(|v| v.as_array())
1446                                    .cloned()
1447                                    .unwrap_or_default();
1448                                (gql_name.as_str(), arr)
1449                            })
1450                            .collect();
1451
1452                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1453                        let mut elements = Vec::with_capacity(len);
1454                        for i in 0..len {
1455                            let mut elem = RowMap::new();
1456                            for (gql_name, arr) in &arrays {
1457                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1458                                elem.insert(gql_name.to_string(), val);
1459                            }
1460                            // Also store by column name for Union resolvers
1461                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1462                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1463                                elem.insert(col.clone(), val);
1464                            }
1465                            elements.push(FieldValue::owned_any(elem));
1466                        }
1467                        Ok(Some(FieldValue::list(elements)))
1468                    })
1469                },
1470            );
1471            if let Some(desc) = description {
1472                array_field = array_field.description(desc);
1473            }
1474            c.record_fields.push(array_field);
1475
1476            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1477            // parser can find the `includes` key (compiler/filter.rs expects
1478            // `{ includes: [{ Name: { is: "..." } }] }`).
1479            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1480            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1481                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1482                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1483                    .description("Match rows where at least one array element satisfies all conditions"));
1484            c.extra_inputs.push(wrapper_filter);
1485
1486            c.filter_fields.push(InputValue::new(
1487                graphql_name,
1488                TypeRef::named(&wrapper_filter_name),
1489            ));
1490
1491            c.extra_objects.push(element_obj);
1492            c.extra_inputs.push(includes_filter);
1493        }
1494    }
1495}
1496
1497/// Resolve the GraphQL Union __typename from a discriminator column value.
1498///
1499/// Walks variants in order; the first whose `source_type_names` contains
1500/// `type_str` wins.  If none match, the last variant is used as fallback
1501/// (conventionally the "catch-all" variant such as JSON).
1502fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1503    for v in variants {
1504        if v.source_type_names.iter().any(|s| s == type_str) {
1505            return v.type_name.clone();
1506        }
1507    }
1508    variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1509}
1510
1511fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1512    match dt {
1513        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1514        DimType::Int => TypeRef::named(TypeRef::INT),
1515        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1516        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1517    }
1518}
1519
1520fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1521    match dt {
1522        DimType::String => "StringFilter",
1523        DimType::Int => "IntFilter",
1524        DimType::Float => "FloatFilter",
1525        DimType::DateTime => "DateTimeFilter",
1526        DimType::Bool => "BoolFilter",
1527    }
1528}
1529
1530pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1531    match v {
1532        serde_json::Value::Null => Value::Null,
1533        serde_json::Value::Bool(b) => Value::from(b),
1534        serde_json::Value::Number(n) => {
1535            if let Some(i) = n.as_i64() { Value::from(i) }
1536            else if let Some(f) = n.as_f64() { Value::from(f) }
1537            else { Value::from(n.to_string()) }
1538        }
1539        serde_json::Value::String(s) => Value::from(s),
1540        _ => Value::from(v.to_string()),
1541    }
1542}
1543
1544/// Build a JoinExpr from a JoinDef and target cube definition.
1545/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1546fn build_join_expr(
1547    jd: &crate::cube::definition::JoinDef,
1548    target_cube: &CubeDefinition,
1549    sub_field: &async_graphql::SelectionField<'_>,
1550    network: &str,
1551    join_idx: usize,
1552) -> JoinExpr {
1553    let target_flat = target_cube.flat_dimensions();
1554    let target_table = target_cube.table_for_chain(network);
1555
1556    let mut requested_paths = HashSet::new();
1557    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1558
1559    let mut selects: Vec<SelectExpr> = target_flat.iter()
1560        .filter(|(path, _)| requested_paths.contains(path))
1561        .map(|(_, dim)| SelectExpr::Column {
1562            column: dim.column.clone(),
1563            alias: None,
1564        })
1565        .collect();
1566
1567    if selects.is_empty() {
1568        selects = target_flat.iter()
1569            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1570            .collect();
1571    }
1572
1573    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1574
1575    let group_by = if is_aggregate {
1576        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1577        for sel in &selects {
1578            if let SelectExpr::Column { column, .. } = sel {
1579                if !column.contains('(') && !gb.contains(column) {
1580                    gb.push(column.clone());
1581                }
1582            }
1583        }
1584        gb
1585    } else {
1586        vec![]
1587    };
1588
1589    JoinExpr {
1590        schema: target_cube.schema.clone(),
1591        table: target_table,
1592        alias: format!("_j{}", join_idx),
1593        conditions: jd.conditions.clone(),
1594        selects,
1595        group_by,
1596        use_final: target_cube.use_final,
1597        is_aggregate,
1598        target_cube: jd.target_cube.clone(),
1599        join_field: sub_field.name().to_string(),
1600        join_type: jd.join_type.clone(),
1601    }
1602}
1603
1604/// Convert a ClickHouse DateTime value to ISO 8601 format.
1605/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1606fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1607    match v {
1608        serde_json::Value::String(s) => {
1609            let iso = if s.contains('T') {
1610                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1611            } else {
1612                let replaced = s.replacen(' ', "T", 1);
1613                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1614            };
1615            Value::from(iso)
1616        }
1617        other => json_to_gql_value(other),
1618    }
1619}