Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59}
60
61/// Callback to transform the resolved table name based on `ChainContext`.
62/// Called after `resolve_table` picks the base table, before SQL compilation.
63/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
64pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
65
66/// Configuration for supported networks (chains) and optional stats collection.
67pub struct SchemaConfig {
68    pub chain_groups: Vec<ChainGroupConfig>,
69    pub root_query_name: String,
70    /// Optional callback invoked after each cube query with execution metadata.
71    /// Used by application layer for billing, observability, etc.
72    pub stats_callback: Option<StatsCallback>,
73    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
74    pub table_name_transform: Option<TableNameTransform>,
75    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
76    pub extra_types: Vec<Enum>,
77}
78
79impl Default for SchemaConfig {
80    fn default() -> Self {
81        Self {
82            chain_groups: vec![
83                ChainGroupConfig {
84                    name: "EVM".to_string(),
85                    group: ChainGroup::Evm,
86                    networks: vec!["eth".into(), "bsc".into()],
87                    has_network_arg: true,
88                    extra_args: vec![],
89                },
90                ChainGroupConfig {
91                    name: "Solana".to_string(),
92                    group: ChainGroup::Solana,
93                    networks: vec!["sol".into()],
94                    has_network_arg: false,
95                    extra_args: vec![],
96                },
97                ChainGroupConfig {
98                    name: "Trading".to_string(),
99                    group: ChainGroup::Trading,
100                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
101                    has_network_arg: false,
102                    extra_args: vec![],
103                },
104            ],
105            root_query_name: "ChainStream".to_string(),
106            stats_callback: None,
107            table_name_transform: None,
108            extra_types: vec![],
109        }
110    }
111}
112
113/// Stored in the parent value of chain wrapper types so cube resolvers can
114/// retrieve the active chain without a `network` argument on themselves.
115/// Service-layer extra_args values are available in the `extra` map.
116pub struct ChainContext {
117    pub network: String,
118    pub extra: HashMap<String, String>,
119}
120
121/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
122///
123/// Schema shape (Bitquery-aligned):
124/// ```graphql
125/// type Query {
126///   EVM(network: EVMNetwork!): EVM!
127///   Solana: Solana!
128///   Trading: Trading!
129///   _cubeMetadata: String!
130/// }
131/// ```
132pub fn build_schema(
133    registry: CubeRegistry,
134    dialect: Arc<dyn SqlDialect>,
135    executor: QueryExecutor,
136    config: SchemaConfig,
137) -> Result<Schema, SchemaError> {
138    let mut builder = Schema::build("Query", None, None);
139
140    // --- shared input / enum types -------------------------------------------
141    builder = builder.register(filter_types::build_limit_input());
142    builder = builder.register(
143        InputObject::new("LimitByInput")
144            .description("Limit results per group (similar to ClickHouse LIMIT BY)")
145            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
146                .description("Comma-separated dimension names to group by"))
147            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
148                .description("Maximum rows per group"))
149            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
150                .description("Rows to skip per group")),
151    );
152    builder = builder.register(
153        Enum::new("OrderDirection")
154            .description("Sort direction")
155            .item(EnumItem::new("ASC").description("Ascending"))
156            .item(EnumItem::new("DESC").description("Descending")),
157    );
158    for input in filter_types::build_filter_primitives() {
159        builder = builder.register(input);
160    }
161    builder = builder.register(
162        Enum::new("TimeUnit")
163            .description("Time unit for interval bucketing")
164            .item(EnumItem::new("seconds"))
165            .item(EnumItem::new("minutes"))
166            .item(EnumItem::new("hours"))
167            .item(EnumItem::new("days"))
168            .item(EnumItem::new("weeks"))
169            .item(EnumItem::new("months")),
170    );
171    builder = builder.register(
172        InputObject::new("TimeIntervalInput")
173            .description("Time bucketing interval for DateTime dimensions")
174            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
175            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
176    );
177    builder = builder.register(
178        InputObject::new("DimSelectWhere")
179            .description("Post-aggregation filter for dimension values (HAVING clause)")
180            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
181            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
182            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
183            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
184            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
185            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
186    );
187
188    for extra_enum in config.extra_types {
189        builder = builder.register(extra_enum);
190    }
191
192    // --- per-group network enums ---------------------------------------------
193    for grp in &config.chain_groups {
194        if grp.has_network_arg {
195            let enum_name = format!("{}Network", grp.name);
196            let mut net_enum = Enum::new(&enum_name)
197                .description(format!("{} network selector", grp.name));
198            for net in &grp.networks {
199                net_enum = net_enum.item(EnumItem::new(net));
200            }
201            builder = builder.register(net_enum);
202        }
203    }
204
205    // --- register cube types once (shared across chain groups) ----------------
206    let mut registered_cubes: HashSet<String> = HashSet::new();
207    for cube in registry.cubes() {
208        if registered_cubes.contains(&cube.name) { continue; }
209        registered_cubes.insert(cube.name.clone());
210
211        let types = build_cube_types(cube);
212        for obj in types.objects { builder = builder.register(obj); }
213        for inp in types.inputs { builder = builder.register(inp); }
214        for en in types.enums { builder = builder.register(en); }
215        for un in types.unions { builder = builder.register(un); }
216    }
217
218    // --- build chain wrapper types + Query fields ----------------------------
219    let mut query = Object::new("Query");
220
221    for grp in &config.chain_groups {
222        let wrapper_type_name = grp.name.clone();
223
224        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
225        // cube fields that belong to this group.
226        let mut wrapper_obj = Object::new(&wrapper_type_name);
227
228        for cube in registry.cubes() {
229            if !cube.chain_groups.contains(&grp.group) { continue; }
230
231            let cube_field = build_cube_field(
232                cube,
233                dialect.clone(),
234                executor.clone(),
235                config.stats_callback.clone(),
236            );
237            wrapper_obj = wrapper_obj.field(cube_field);
238        }
239        builder = builder.register(wrapper_obj);
240
241        // Wrapper resolver on Query — stores ChainContext for child resolvers.
242        let default_network = grp.networks.first().cloned().unwrap_or_default();
243        let has_net_arg = grp.has_network_arg;
244        let net_enum_name = format!("{}Network", grp.name);
245        let wrapper_type_for_resolver = wrapper_type_name.clone();
246        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
247
248        let mut wrapper_field = Field::new(
249            &wrapper_type_name,
250            TypeRef::named_nn(&wrapper_type_name),
251            move |ctx| {
252                let default = default_network.clone();
253                let has_arg = has_net_arg;
254                let arg_names = extra_arg_names.clone();
255                FieldFuture::new(async move {
256                    let network = if has_arg {
257                        ctx.args.try_get("network")
258                            .ok()
259                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
260                            .unwrap_or(default)
261                    } else {
262                        default
263                    };
264                    let mut extra = HashMap::new();
265                    for arg_name in &arg_names {
266                        if let Ok(val) = ctx.args.try_get(arg_name) {
267                            let resolved = val.enum_name().ok().map(|s| s.to_string())
268                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
269                                .or_else(|| val.string().ok().map(|s| s.to_string()));
270                            if let Some(v) = resolved {
271                                extra.insert(arg_name.clone(), v);
272                            }
273                        }
274                    }
275                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
276                })
277            },
278        );
279
280        if grp.has_network_arg {
281            wrapper_field = wrapper_field.argument(
282                InputValue::new("network", TypeRef::named_nn(&net_enum_name))
283                    .description(format!("{} network to query", wrapper_type_for_resolver)),
284            );
285        }
286        for wa in &grp.extra_args {
287            wrapper_field = wrapper_field.argument(
288                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
289                    .description(&wa.description),
290            );
291        }
292
293        query = query.field(wrapper_field);
294    }
295
296    // --- _cubeMetadata -------------------------------------------------------
297    let metadata_registry = Arc::new(registry.clone());
298    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
299        .map(|g| (g.name.clone(), g.group.clone()))
300        .collect();
301    let metadata_field = Field::new(
302        "_cubeMetadata",
303        TypeRef::named_nn(TypeRef::STRING),
304        move |_ctx| {
305            let reg = metadata_registry.clone();
306            let groups = metadata_groups.clone();
307            FieldFuture::new(async move {
308                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
309                for (group_name, group_enum) in &groups {
310                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
311                        .filter(|c| c.chain_groups.contains(group_enum))
312                        .map(serialize_cube_metadata)
313                        .collect();
314                    group_metadata.push(serde_json::json!({
315                        "group": group_name,
316                        "cubes": cubes_in_group,
317                    }));
318                }
319                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
320                Ok(Some(FieldValue::value(Value::from(json))))
321            })
322        },
323    )
324    .description("Internal: returns JSON metadata about all cubes grouped by chain");
325    query = query.field(metadata_field);
326
327    builder = builder.register(query);
328    builder = builder.data(registry);
329
330    if let Some(transform) = config.table_name_transform.clone() {
331        builder = builder.data(transform);
332    }
333
334    builder.finish()
335}
336
337fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
338    serde_json::json!({
339        "name": cube.name,
340        "description": cube.description,
341        "schema": cube.schema,
342        "tablePattern": cube.table_pattern,
343        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
344        "metrics": cube.metrics.iter().map(|m| {
345            let mut obj = serde_json::json!({
346                "name": m.name,
347                "returnType": format!("{:?}", m.return_type),
348            });
349            if let Some(ref tmpl) = m.expression_template {
350                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
351            }
352            if let Some(ref desc) = m.description {
353                obj["description"] = serde_json::Value::String(desc.clone());
354            }
355            obj
356        }).collect::<Vec<_>>(),
357        "selectors": cube.selectors.iter().map(|s| {
358            serde_json::json!({
359                "name": s.graphql_name,
360                "column": s.column,
361                "type": format!("{:?}", s.dim_type),
362            })
363        }).collect::<Vec<_>>(),
364        "dimensions": serialize_dims(&cube.dimensions),
365        "joins": cube.joins.iter().map(|j| {
366            serde_json::json!({
367                "field": j.field_name,
368                "target": j.target_cube,
369                "joinType": format!("{:?}", j.join_type),
370            })
371        }).collect::<Vec<_>>(),
372        "tableRoutes": cube.table_routes.iter().map(|r| {
373            serde_json::json!({
374                "schema": r.schema,
375                "tablePattern": r.table_pattern,
376                "availableColumns": r.available_columns,
377                "priority": r.priority,
378            })
379        }).collect::<Vec<_>>(),
380        "defaultLimit": cube.default_limit,
381        "maxLimit": cube.max_limit,
382    })
383}
384
385/// Build a single cube Field that reads `network` from the parent ChainContext.
386fn build_cube_field(
387    cube: &CubeDefinition,
388    dialect: Arc<dyn SqlDialect>,
389    executor: QueryExecutor,
390    stats_cb: Option<StatsCallback>,
391) -> Field {
392    let cube_name = cube.name.clone();
393    let orderby_input_name = format!("{}OrderByInput", cube.name);
394    let cube_description = cube.description.clone();
395
396    let mut field = Field::new(
397        &cube.name,
398        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
399        move |ctx| {
400            let cube_name = cube_name.clone();
401            let dialect = dialect.clone();
402            let executor = executor.clone();
403            let stats_cb = stats_cb.clone();
404            FieldFuture::new(async move {
405                let registry = ctx.ctx.data::<CubeRegistry>()?;
406
407                let chain_ctx = ctx.parent_value
408                    .try_downcast_ref::<ChainContext>().ok();
409                let network = chain_ctx
410                    .map(|c| c.network.as_str())
411                    .unwrap_or("sol");
412
413                let cube_def = registry.get(&cube_name).ok_or_else(|| {
414                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
415                })?;
416
417                let metric_requests = extract_metric_requests(&ctx, cube_def);
418                let quantile_requests = extract_quantile_requests(&ctx);
419                let calculate_requests = extract_calculate_requests(&ctx);
420                let field_aliases = extract_field_aliases(&ctx, cube_def);
421                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
422                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
423                let requested = extract_requested_fields(&ctx, cube_def);
424                let mut ir = compiler::parser::parse_cube_query(
425                    cube_def,
426                    network,
427                    &ctx.args,
428                    &metric_requests,
429                    &quantile_requests,
430                    &calculate_requests,
431                    &field_aliases,
432                    &dim_agg_requests,
433                    &time_intervals,
434                    Some(requested),
435                )?;
436
437                let mut join_idx = 0usize;
438                for sub_field in ctx.ctx.field().selection_set() {
439                    let fname = sub_field.name().to_string();
440                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
441                    if let Some(jd) = join_def {
442                        if let Some(target_cube) = registry.get(&jd.target_cube) {
443                            let join_expr = build_join_expr(
444                                jd, target_cube, &sub_field, network, join_idx,
445                            );
446                            ir.joins.push(join_expr);
447                            join_idx += 1;
448                        }
449                    }
450                }
451
452                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
453                    if let Some(cctx) = chain_ctx {
454                        ir.table = transform(&ir.table, cctx);
455                    }
456                }
457
458                let validated = compiler::validator::validate(ir)?;
459                let result = dialect.compile(&validated);
460                let sql = result.sql;
461                let bindings = result.bindings;
462
463                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
464                    async_graphql::Error::new(format!("Query execution failed: {e}"))
465                })?;
466
467                let rows = if result.alias_remap.is_empty() {
468                    rows
469                } else {
470                    rows.into_iter().map(|mut row| {
471                        for (alias, original) in &result.alias_remap {
472                            if let Some(val) = row.shift_remove(alias) {
473                                row.entry(original.clone()).or_insert(val);
474                            }
475                        }
476                        row
477                    }).collect()
478                };
479
480                let rows: Vec<RowMap> = if validated.joins.is_empty() {
481                    rows
482                } else {
483                    rows.into_iter().map(|mut row| {
484                        for join in &validated.joins {
485                            let prefix = format!("{}.", join.alias);
486                            let mut sub_row = RowMap::new();
487                            let keys: Vec<String> = row.keys()
488                                .filter(|k| k.starts_with(&prefix))
489                                .cloned()
490                                .collect();
491                            for key in keys {
492                                if let Some(val) = row.shift_remove(&key) {
493                                    sub_row.insert(key[prefix.len()..].to_string(), val);
494                                }
495                            }
496                            let obj: serde_json::Map<String, serde_json::Value> =
497                                sub_row.into_iter().collect();
498                            row.insert(
499                                join.join_field.clone(),
500                                serde_json::Value::Object(obj),
501                            );
502                        }
503                        row
504                    }).collect()
505                };
506
507                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
508                    .or_else(|| stats_cb.clone());
509                if let Some(cb) = effective_cb {
510                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
511                    cb(stats);
512                }
513
514                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
515                Ok(Some(FieldValue::list(values)))
516            })
517        },
518    );
519    if !cube_description.is_empty() {
520        field = field.description(&cube_description);
521    }
522    field = field
523        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
524            .description("Filter conditions"))
525        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
526            .description("Pagination control"))
527        .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
528            .description("Per-group row limit"))
529        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
530            .description("Sort order (Bitquery-compatible)"));
531
532    for sel in &cube.selectors {
533        let filter_type = dim_type_to_filter_name(&sel.dim_type);
534        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
535            .description(format!("Shorthand filter for {}", sel.graphql_name)));
536    }
537
538    field
539}
540
541fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
542    serde_json::Value::Array(dims.iter().map(|d| match d {
543        DimensionNode::Leaf(dim) => {
544            let mut obj = serde_json::json!({
545                "name": dim.graphql_name,
546                "column": dim.column,
547                "type": format!("{:?}", dim.dim_type),
548            });
549            if let Some(desc) = &dim.description {
550                obj["description"] = serde_json::Value::String(desc.clone());
551            }
552            obj
553        },
554        DimensionNode::Group { graphql_name, description, children } => {
555            let mut obj = serde_json::json!({
556                "name": graphql_name,
557                "children": serialize_dims(children),
558            });
559            if let Some(desc) = description {
560                obj["description"] = serde_json::Value::String(desc.clone());
561            }
562            obj
563        },
564        DimensionNode::Array { graphql_name, description, children } => {
565            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
566                serde_json::json!({
567                    "name": f.graphql_name,
568                    "column": f.column,
569                    "type": format!("{:?}", f.field_type),
570                })
571            }).collect();
572            let mut obj = serde_json::json!({
573                "name": graphql_name,
574                "kind": "array",
575                "fields": fields,
576            });
577            if let Some(desc) = description {
578                obj["description"] = serde_json::Value::String(desc.clone());
579            }
580            obj
581        },
582    }).collect())
583}
584
585/// Extract metric requests from the GraphQL selection set by inspecting
586/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
587/// the "count" field in the selection set and extract its `of` argument.
588fn extract_metric_requests(
589    ctx: &async_graphql::dynamic::ResolverContext,
590    cube: &CubeDefinition,
591) -> Vec<compiler::parser::MetricRequest> {
592    let mut requests = Vec::new();
593
594    for sub_field in ctx.ctx.field().selection_set() {
595        let name = sub_field.name();
596        if !cube.has_metric(name) {
597            continue;
598        }
599
600        let args = match sub_field.arguments() {
601            Ok(args) => args,
602            Err(_) => continue,
603        };
604
605        let of_dimension = args
606            .iter()
607            .find(|(k, _)| k.as_str() == "of")
608            .and_then(|(_, v)| match v {
609                async_graphql::Value::Enum(e) => Some(e.to_string()),
610                async_graphql::Value::String(s) => Some(s.clone()),
611                _ => None,
612            })
613            .unwrap_or_else(|| "*".to_string());
614
615        let select_where_value = args
616            .iter()
617            .find(|(k, _)| k.as_str() == "selectWhere")
618            .map(|(_, v)| v.clone());
619
620        let condition_filter = args
621            .iter()
622            .find(|(k, _)| k.as_str() == "if")
623            .and_then(|(_, v)| {
624                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
625                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
626            });
627
628        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
629        requests.push(compiler::parser::MetricRequest {
630            function: name.to_string(),
631            alias: gql_alias,
632            of_dimension,
633            select_where_value,
634            condition_filter,
635        });
636    }
637
638    requests
639}
640
641fn extract_requested_fields(
642    ctx: &async_graphql::dynamic::ResolverContext,
643    cube: &CubeDefinition,
644) -> HashSet<String> {
645    let mut fields = HashSet::new();
646    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
647    fields
648}
649
650fn collect_selection_paths(
651    field: &async_graphql::SelectionField<'_>,
652    prefix: &str,
653    out: &mut HashSet<String>,
654    metrics: &[MetricDef],
655) {
656    for sub in field.selection_set() {
657        let name = sub.name();
658        if metrics.iter().any(|m| m.name == name) {
659            continue;
660        }
661        let path = if prefix.is_empty() {
662            name.to_string()
663        } else {
664            format!("{prefix}_{name}")
665        };
666        let has_children = sub.selection_set().next().is_some();
667        if has_children {
668            collect_selection_paths(&sub, &path, out, metrics);
669        } else {
670            out.insert(path);
671        }
672    }
673}
674
675/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
676/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
677pub type FieldAliasMap = Vec<(String, String)>;
678
679/// Describes a `quantile(of: ..., level: ...)` request.
680pub struct QuantileRequest {
681    pub alias: String,
682    pub of_dimension: String,
683    pub level: f64,
684}
685
686/// Describes a `calculate(expression: "...")` request.
687pub struct CalculateRequest {
688    pub alias: String,
689    pub expression: String,
690}
691
692/// Describes a time interval bucketing request.
693/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
694pub struct TimeIntervalRequest {
695    pub field_path: String,
696    pub graphql_alias: String,
697    pub column: String,
698    pub unit: String,
699    pub count: i64,
700}
701
702/// Describes a dimension-level aggregation request extracted from the selection set.
703/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
704pub struct DimAggRequest {
705    pub field_path: String,
706    pub graphql_alias: String,
707    pub value_column: String,
708    pub agg_type: DimAggType,
709    pub compare_column: String,
710    pub condition_filter: Option<FilterNode>,
711    pub select_where_value: Option<async_graphql::Value>,
712}
713
714fn extract_quantile_requests(
715    ctx: &async_graphql::dynamic::ResolverContext,
716) -> Vec<QuantileRequest> {
717    let mut requests = Vec::new();
718    for sub_field in ctx.ctx.field().selection_set() {
719        if sub_field.name() != "quantile" { continue; }
720        let args = match sub_field.arguments() {
721            Ok(a) => a,
722            Err(_) => continue,
723        };
724        let of_dim = args.iter()
725            .find(|(k, _)| k.as_str() == "of")
726            .and_then(|(_, v)| match v {
727                async_graphql::Value::Enum(e) => Some(e.to_string()),
728                async_graphql::Value::String(s) => Some(s.clone()),
729                _ => None,
730            })
731            .unwrap_or_else(|| "*".to_string());
732        let level = args.iter()
733            .find(|(k, _)| k.as_str() == "level")
734            .and_then(|(_, v)| match v {
735                async_graphql::Value::Number(n) => n.as_f64(),
736                _ => None,
737            })
738            .unwrap_or(0.5);
739        let alias = sub_field.alias().unwrap_or("quantile").to_string();
740        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
741    }
742    requests
743}
744
745fn extract_field_aliases(
746    ctx: &async_graphql::dynamic::ResolverContext,
747    cube: &CubeDefinition,
748) -> FieldAliasMap {
749    let flat = cube.flat_dimensions();
750    let mut aliases = Vec::new();
751    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
752    aliases
753}
754
755fn collect_field_aliases(
756    field: &async_graphql::SelectionField<'_>,
757    prefix: &str,
758    flat: &[(String, crate::cube::definition::Dimension)],
759    metrics: &[MetricDef],
760    out: &mut FieldAliasMap,
761) {
762    for sub in field.selection_set() {
763        let name = sub.name();
764        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
765            continue;
766        }
767        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
768        let has_children = sub.selection_set().next().is_some();
769        if has_children {
770            collect_field_aliases(&sub, &path, flat, metrics, out);
771        } else if let Some(alias) = sub.alias() {
772            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
773                let alias_path = if prefix.is_empty() {
774                    alias.to_string()
775                } else {
776                    format!("{prefix}_{alias}")
777                };
778                out.push((alias_path, dim.column.clone()));
779            }
780        }
781    }
782}
783
784fn extract_calculate_requests(
785    ctx: &async_graphql::dynamic::ResolverContext,
786) -> Vec<CalculateRequest> {
787    let mut requests = Vec::new();
788    for sub_field in ctx.ctx.field().selection_set() {
789        if sub_field.name() != "calculate" { continue; }
790        let args = match sub_field.arguments() {
791            Ok(a) => a,
792            Err(_) => continue,
793        };
794        let expression = args.iter()
795            .find(|(k, _)| k.as_str() == "expression")
796            .and_then(|(_, v)| match v {
797                async_graphql::Value::String(s) => Some(s.clone()),
798                _ => None,
799            });
800        if let Some(expr) = expression {
801            let alias = sub_field.alias().unwrap_or("calculate").to_string();
802            requests.push(CalculateRequest { alias, expression: expr });
803        }
804    }
805    requests
806}
807
808fn extract_dim_agg_requests(
809    ctx: &async_graphql::dynamic::ResolverContext,
810    cube: &CubeDefinition,
811) -> Vec<DimAggRequest> {
812    let flat = cube.flat_dimensions();
813    let mut requests = Vec::new();
814    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
815    requests
816}
817
818fn collect_dim_agg_paths(
819    field: &async_graphql::SelectionField<'_>,
820    prefix: &str,
821    flat: &[(String, crate::cube::definition::Dimension)],
822    metrics: &[MetricDef],
823    cube: &CubeDefinition,
824    out: &mut Vec<DimAggRequest>,
825) {
826    for sub in field.selection_set() {
827        let name = sub.name();
828        if metrics.iter().any(|m| m.name == name) {
829            continue;
830        }
831        let path = if prefix.is_empty() {
832            name.to_string()
833        } else {
834            format!("{prefix}_{name}")
835        };
836        let has_children = sub.selection_set().next().is_some();
837        if has_children {
838            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
839        } else {
840            let args = match sub.arguments() {
841                Ok(a) => a,
842                Err(_) => continue,
843            };
844            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
845            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
846
847            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
848                let cp = match v {
849                    async_graphql::Value::Enum(e) => e.to_string(),
850                    async_graphql::Value::String(s) => s.clone(),
851                    _ => continue,
852                };
853                (DimAggType::ArgMax, cp)
854            } else if let Some((_, v)) = min_val {
855                let cp = match v {
856                    async_graphql::Value::Enum(e) => e.to_string(),
857                    async_graphql::Value::String(s) => s.clone(),
858                    _ => continue,
859                };
860                (DimAggType::ArgMin, cp)
861            } else {
862                continue;
863            };
864
865            let value_column = flat.iter()
866                .find(|(p, _)| p == &path)
867                .map(|(_, dim)| dim.column.clone());
868            let compare_column = flat.iter()
869                .find(|(p, _)| p == &compare_path)
870                .map(|(_, dim)| dim.column.clone());
871
872            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
873                let condition_filter = args.iter()
874                    .find(|(k, _)| k.as_str() == "if")
875                    .and_then(|(_, v)| {
876                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
877                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
878                    });
879                let select_where_value = args.iter()
880                    .find(|(k, _)| k.as_str() == "selectWhere")
881                    .map(|(_, v)| v.clone());
882
883                let gql_alias = sub.alias()
884                    .map(|a| a.to_string())
885                    .unwrap_or_else(|| path.clone());
886                out.push(DimAggRequest {
887                    field_path: path,
888                    graphql_alias: gql_alias,
889                    value_column: vc,
890                    agg_type,
891                    compare_column: cc,
892                    condition_filter,
893                    select_where_value,
894                });
895            }
896        }
897    }
898}
899
900fn extract_time_interval_requests(
901    ctx: &async_graphql::dynamic::ResolverContext,
902    cube: &CubeDefinition,
903) -> Vec<TimeIntervalRequest> {
904    let flat = cube.flat_dimensions();
905    let mut requests = Vec::new();
906    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
907    requests
908}
909
910fn collect_time_interval_paths(
911    field: &async_graphql::SelectionField<'_>,
912    prefix: &str,
913    flat: &[(String, crate::cube::definition::Dimension)],
914    metrics: &[MetricDef],
915    out: &mut Vec<TimeIntervalRequest>,
916) {
917    for sub in field.selection_set() {
918        let name = sub.name();
919        if metrics.iter().any(|m| m.name == name) { continue; }
920        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
921        let has_children = sub.selection_set().next().is_some();
922        if has_children {
923            collect_time_interval_paths(&sub, &path, flat, metrics, out);
924        } else {
925            let args = match sub.arguments() {
926                Ok(a) => a,
927                Err(_) => continue,
928            };
929            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
930            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
931                let unit = obj.get("in")
932                    .and_then(|v| match v {
933                        async_graphql::Value::Enum(e) => Some(e.to_string()),
934                        async_graphql::Value::String(s) => Some(s.clone()),
935                        _ => None,
936                    });
937                let count = obj.get("count")
938                    .and_then(|v| match v {
939                        async_graphql::Value::Number(n) => n.as_i64(),
940                        _ => None,
941                    });
942                if let (Some(unit), Some(count)) = (unit, count) {
943                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
944                        let gql_alias = sub.alias().unwrap_or(name).to_string();
945                        out.push(TimeIntervalRequest {
946                            field_path: path,
947                            graphql_alias: gql_alias,
948                            column: dim.column.clone(),
949                            unit,
950                            count,
951                        });
952                    }
953                }
954            }
955        }
956    }
957}
958
959// ---------------------------------------------------------------------------
960// Per-Cube GraphQL type generation
961// ---------------------------------------------------------------------------
962
963struct CubeTypes {
964    objects: Vec<Object>,
965    inputs: Vec<InputObject>,
966    enums: Vec<Enum>,
967    unions: Vec<Union>,
968}
969
970fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
971    let record_name = format!("{}Record", cube.name);
972    let filter_name = format!("{}Filter", cube.name);
973    let compare_enum_name = format!("{}CompareFields", cube.name);
974
975    let flat_dims = cube.flat_dimensions();
976
977    let mut compare_enum = Enum::new(&compare_enum_name)
978        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
979    for (path, _) in &flat_dims {
980        compare_enum = compare_enum.item(EnumItem::new(path));
981    }
982
983    let mut record_fields: Vec<Field> = Vec::new();
984    let mut filter_fields: Vec<InputValue> = Vec::new();
985    let mut extra_objects: Vec<Object> = Vec::new();
986    let mut extra_inputs: Vec<InputObject> = Vec::new();
987    let mut extra_unions: Vec<Union> = Vec::new();
988
989    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
990        .description("OR combinator — matches if any sub-filter matches"));
991
992    {
993        let mut collector = DimCollector {
994            cube_name: &cube.name,
995            compare_enum_name: &compare_enum_name,
996            filter_name: &filter_name,
997            record_fields: &mut record_fields,
998            filter_fields: &mut filter_fields,
999            extra_objects: &mut extra_objects,
1000            extra_inputs: &mut extra_inputs,
1001            extra_unions: &mut extra_unions,
1002        };
1003        for node in &cube.dimensions {
1004            collect_dimension_types(node, "", &mut collector);
1005        }
1006    }
1007
1008    let mut metric_enums: Vec<Enum> = Vec::new();
1009    let builtin_descs: std::collections::HashMap<&str, &str> = [
1010        ("count", "Count of rows or distinct values"),
1011        ("sum", "Sum of values"),
1012        ("avg", "Average of values"),
1013        ("min", "Minimum value"),
1014        ("max", "Maximum value"),
1015        ("uniq", "Count of unique (distinct) values"),
1016    ].into_iter().collect();
1017
1018    for metric in &cube.metrics {
1019        let metric_name = &metric.name;
1020        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1021
1022        if metric.supports_where {
1023            extra_inputs.push(
1024                InputObject::new(&select_where_name)
1025                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1026                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1027                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1028                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1029                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1030                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1031            );
1032        }
1033
1034        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1035        let mut of_enum = Enum::new(&of_enum_name)
1036            .description(format!("Dimension to apply {} aggregation on", metric_name));
1037        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1038        metric_enums.push(of_enum);
1039
1040        let metric_name_clone = metric_name.clone();
1041        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1042        let metric_desc = metric.description.as_deref()
1043            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1044            .unwrap_or("Aggregate metric");
1045
1046        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1047            let default_name = metric_name_clone.clone();
1048            FieldFuture::new(async move {
1049                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1050                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1051                let key = metric_key(alias);
1052                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1053                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1054            })
1055        })
1056        .description(metric_desc)
1057        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1058            .description("Dimension to aggregate on (default: all rows)"));
1059
1060        if metric.supports_where {
1061            metric_field = metric_field
1062                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1063                    .description("Post-aggregation filter (HAVING)"))
1064                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1065                    .description("Conditional filter for this metric"));
1066        }
1067
1068        record_fields.push(metric_field);
1069    }
1070
1071    // `quantile(of: ..., level: ...)` — percentile computation
1072    {
1073        let of_enum_name = format!("{}_quantile_Of", cube.name);
1074        let mut of_enum = Enum::new(&of_enum_name)
1075            .description(format!("Dimension to apply quantile on for {}", cube.name));
1076        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1077        metric_enums.push(of_enum);
1078
1079        let of_enum_for_closure = of_enum_name.clone();
1080        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1081            FieldFuture::new(async move {
1082                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1083                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1084                let key = metric_key(alias);
1085                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1086                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1087            })
1088        })
1089        .description("Compute a quantile (percentile) of a dimension")
1090        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1091            .description("Dimension to compute quantile on"))
1092        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1093            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1094        record_fields.push(quantile_field);
1095    }
1096
1097    // `calculate(expression: "...")` — runtime expression computation
1098    {
1099        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1100            FieldFuture::new(async move {
1101                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1102                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1103                let key = metric_key(alias);
1104                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1105                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1106            })
1107        })
1108        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1109        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1110            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1111        record_fields.push(calculate_field);
1112    }
1113
1114    // Add join fields: joinXxx returns the target cube's Record type
1115    for jd in &cube.joins {
1116        let target_record_name = format!("{}Record", jd.target_cube);
1117        let field_name_owned = jd.field_name.clone();
1118        let mut join_field = Field::new(
1119            &jd.field_name,
1120            TypeRef::named(&target_record_name),
1121            move |ctx| {
1122                let field_name = field_name_owned.clone();
1123                FieldFuture::new(async move {
1124                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1125                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1126                        let sub_row: RowMap = obj.iter()
1127                            .map(|(k, v)| (k.clone(), v.clone()))
1128                            .collect();
1129                        Ok(Some(FieldValue::owned_any(sub_row)))
1130                    } else {
1131                        Ok(Some(FieldValue::value(Value::Null)))
1132                    }
1133                })
1134            },
1135        );
1136        if let Some(desc) = &jd.description {
1137            join_field = join_field.description(desc);
1138        }
1139        record_fields.push(join_field);
1140    }
1141
1142    let mut record = Object::new(&record_name);
1143    for f in record_fields { record = record.field(f); }
1144
1145    let mut filter = InputObject::new(&filter_name)
1146        .description(format!("Filter conditions for {} query", cube.name));
1147    for f in filter_fields { filter = filter.field(f); }
1148
1149    let orderby_input_name = format!("{}OrderByInput", cube.name);
1150    let orderby_input = InputObject::new(&orderby_input_name)
1151        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1152        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1153            .description("Sort descending by this field"))
1154        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1155            .description("Sort ascending by this field"))
1156        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1157            .description("Sort descending by computed/aggregated field name"))
1158        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1159            .description("Sort ascending by computed/aggregated field name"));
1160
1161    let mut objects = vec![record]; objects.extend(extra_objects);
1162    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
1163    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1164
1165    CubeTypes { objects, inputs, enums, unions: extra_unions }
1166}
1167
1168struct DimCollector<'a> {
1169    cube_name: &'a str,
1170    compare_enum_name: &'a str,
1171    filter_name: &'a str,
1172    record_fields: &'a mut Vec<Field>,
1173    filter_fields: &'a mut Vec<InputValue>,
1174    extra_objects: &'a mut Vec<Object>,
1175    extra_inputs: &'a mut Vec<InputObject>,
1176    extra_unions: &'a mut Vec<Union>,
1177}
1178
1179fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1180    match node {
1181        DimensionNode::Leaf(dim) => {
1182            let col = dim.column.clone();
1183            let is_datetime = dim.dim_type == DimType::DateTime;
1184            let compare_enum = c.compare_enum_name.to_string();
1185            let cube_filter = c.filter_name.to_string();
1186            let mut leaf_field = Field::new(
1187                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1188                move |ctx| {
1189                    let col = col.clone();
1190                    FieldFuture::new(async move {
1191                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1192                        let has_interval = ctx.args.try_get("interval").is_ok();
1193                        let has_max = ctx.args.try_get("maximum").is_ok();
1194                        let has_min = ctx.args.try_get("minimum").is_ok();
1195                        let key = if has_interval || has_max || has_min {
1196                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1197                            dim_agg_key(name)
1198                        } else {
1199                            col.clone()
1200                        };
1201                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1202                        let gql_val = if is_datetime {
1203                            json_to_gql_datetime(val)
1204                        } else {
1205                            json_to_gql_value(val)
1206                        };
1207                        Ok(Some(FieldValue::value(gql_val)))
1208                    })
1209                },
1210            );
1211            if let Some(desc) = &dim.description {
1212                leaf_field = leaf_field.description(desc);
1213            }
1214            leaf_field = leaf_field
1215                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1216                    .description("Return value from row where compare field is maximum (argMax)"))
1217                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1218                    .description("Return value from row where compare field is minimum (argMin)"))
1219                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1220                    .description("Conditional filter for aggregation"))
1221                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1222                    .description("Post-aggregation value filter (HAVING)"));
1223            if is_datetime {
1224                leaf_field = leaf_field
1225                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1226                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1227            }
1228            // Update resolver key: if interval is present, read from the interval expression key
1229            // This is handled by the resolver checking ctx.args for "interval"
1230            c.record_fields.push(leaf_field);
1231            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1232        }
1233        DimensionNode::Group { graphql_name, description, children } => {
1234            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1235            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1236            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1237
1238            let mut child_record_fields: Vec<Field> = Vec::new();
1239            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1240            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1241
1242            let mut child_collector = DimCollector {
1243                cube_name: c.cube_name,
1244                compare_enum_name: c.compare_enum_name,
1245                filter_name: c.filter_name,
1246                record_fields: &mut child_record_fields,
1247                filter_fields: &mut child_filter_fields,
1248                extra_objects: c.extra_objects,
1249                extra_inputs: c.extra_inputs,
1250                extra_unions: c.extra_unions,
1251            };
1252            for child in children {
1253                collect_dimension_types(child, &new_prefix, &mut child_collector);
1254            }
1255
1256            let mut nested_record = Object::new(&nested_record_name);
1257            for f in child_record_fields { nested_record = nested_record.field(f); }
1258
1259            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1260            let mut nested_filter = InputObject::new(&nested_filter_name)
1261                .description(nested_filter_desc);
1262            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1263
1264            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1265                FieldFuture::new(async move {
1266                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1267                    Ok(Some(FieldValue::owned_any(row.clone())))
1268                })
1269            });
1270            if let Some(desc) = description {
1271                group_field = group_field.description(desc);
1272            }
1273            c.record_fields.push(group_field);
1274            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1275            c.extra_objects.push(nested_record);
1276            c.extra_inputs.push(nested_filter);
1277        }
1278        DimensionNode::Array { graphql_name, description, children } => {
1279            let full_path = if prefix.is_empty() {
1280                graphql_name.clone()
1281            } else {
1282                format!("{prefix}_{graphql_name}")
1283            };
1284            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1285            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1286
1287            let mut element_obj = Object::new(&element_type_name);
1288            let mut includes_filter = InputObject::new(&includes_filter_name)
1289                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1290
1291            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1292
1293            for child in children {
1294                match &child.field_type {
1295                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1296                        let col_name = child.column.clone();
1297                        let is_datetime = *dt == DimType::DateTime;
1298                        let mut field = Field::new(
1299                            &child.graphql_name,
1300                            dim_type_to_typeref(dt),
1301                            move |ctx| {
1302                                let col = col_name.clone();
1303                                FieldFuture::new(async move {
1304                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1305                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1306                                    let gql_val = if is_datetime {
1307                                        json_to_gql_datetime(val)
1308                                    } else {
1309                                        json_to_gql_value(val)
1310                                    };
1311                                    Ok(Some(FieldValue::value(gql_val)))
1312                                })
1313                            },
1314                        );
1315                        if let Some(desc) = &child.description {
1316                            field = field.description(desc);
1317                        }
1318                        element_obj = element_obj.field(field);
1319                        includes_filter = includes_filter.field(
1320                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1321                        );
1322                    }
1323                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1324                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1325
1326                        let mut gql_union = Union::new(&union_name);
1327                        let mut variant_objects = Vec::new();
1328
1329                        for v in variants {
1330                            let variant_obj_name = v.type_name.clone();
1331                            let field_name = v.field_name.clone();
1332                            let source_type = v.source_type.clone();
1333                            let type_ref = dim_type_to_typeref(&source_type);
1334
1335                            let val_col = child.column.clone();
1336                            let variant_obj = Object::new(&variant_obj_name)
1337                                .field(Field::new(
1338                                    &field_name,
1339                                    type_ref,
1340                                    move |ctx| {
1341                                        let col = val_col.clone();
1342                                        FieldFuture::new(async move {
1343                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1344                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1345                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1346                                        })
1347                                    },
1348                                ));
1349                            gql_union = gql_union.possible_type(&variant_obj_name);
1350                            variant_objects.push(variant_obj);
1351                        }
1352
1353                        let col_name = child.column.clone();
1354                        let type_col = children.iter()
1355                            .find(|f| f.graphql_name == "Type")
1356                            .map(|f| f.column.clone())
1357                            .unwrap_or_default();
1358                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1359
1360                        let mut field = Field::new(
1361                            &child.graphql_name,
1362                            TypeRef::named(&union_name),
1363                            move |ctx| {
1364                                let col = col_name.clone();
1365                                let tcol = type_col.clone();
1366                                let vars = variants_clone.clone();
1367                                FieldFuture::new(async move {
1368                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1369                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1370                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1371                                        return Ok(None);
1372                                    }
1373                                    let type_str = row.get(&tcol)
1374                                        .and_then(|v| v.as_str())
1375                                        .unwrap_or("");
1376                                    let resolved_type = resolve_union_typename(type_str, &vars);
1377                                    let mut elem = RowMap::new();
1378                                    elem.insert(col.clone(), raw_val);
1379                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1380                                })
1381                            },
1382                        );
1383                        if let Some(desc) = &child.description {
1384                            field = field.description(desc);
1385                        }
1386                        element_obj = element_obj.field(field);
1387
1388                        includes_filter = includes_filter.field(
1389                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1390                        );
1391
1392                        union_registrations.push((union_name, gql_union, variant_objects));
1393                    }
1394                }
1395            }
1396
1397            // Register union types and variant objects
1398            for (_, union_type, variant_objs) in union_registrations {
1399                c.extra_objects.extend(variant_objs);
1400                // Unions need to be registered differently — store in extra_objects
1401                // by wrapping in a dummy. Actually, we need to register unions via builder.
1402                // For now, we'll store them in a new field. Let's use the enums vec hack:
1403                // Actually, async-graphql dynamic schema registers unions the same way.
1404                // We need to pass them up. Let's add a unions vec to DimCollector.
1405                // For now, store union as Object (it won't work). We need to refactor.
1406                // Let me add unions to CubeTypes.
1407                c.extra_unions.push(union_type);
1408            }
1409
1410            // Build the list resolver: zip parallel array columns into element objects
1411            let child_columns: Vec<(String, String)> = children.iter()
1412                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1413                .collect();
1414            let element_type_name_clone = element_type_name.clone();
1415
1416            let mut array_field = Field::new(
1417                graphql_name,
1418                TypeRef::named_nn_list_nn(&element_type_name),
1419                move |ctx| {
1420                    let cols = child_columns.clone();
1421                    let _etype = element_type_name_clone.clone();
1422                    FieldFuture::new(async move {
1423                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1424                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1425                            .map(|(gql_name, col)| {
1426                                let arr = row.get(col)
1427                                    .and_then(|v| v.as_array())
1428                                    .cloned()
1429                                    .unwrap_or_default();
1430                                (gql_name.as_str(), arr)
1431                            })
1432                            .collect();
1433
1434                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1435                        let mut elements = Vec::with_capacity(len);
1436                        for i in 0..len {
1437                            let mut elem = RowMap::new();
1438                            for (gql_name, arr) in &arrays {
1439                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1440                                elem.insert(gql_name.to_string(), val);
1441                            }
1442                            // Also store by column name for Union resolvers
1443                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1444                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1445                                elem.insert(col.clone(), val);
1446                            }
1447                            elements.push(FieldValue::owned_any(elem));
1448                        }
1449                        Ok(Some(FieldValue::list(elements)))
1450                    })
1451                },
1452            );
1453            if let Some(desc) = description {
1454                array_field = array_field.description(desc);
1455            }
1456            c.record_fields.push(array_field);
1457
1458            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1459            // parser can find the `includes` key (compiler/filter.rs expects
1460            // `{ includes: [{ Name: { is: "..." } }] }`).
1461            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1462            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1463                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1464                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1465                    .description("Match rows where at least one array element satisfies all conditions"));
1466            c.extra_inputs.push(wrapper_filter);
1467
1468            c.filter_fields.push(InputValue::new(
1469                graphql_name,
1470                TypeRef::named(&wrapper_filter_name),
1471            ));
1472
1473            c.extra_objects.push(element_obj);
1474            c.extra_inputs.push(includes_filter);
1475        }
1476    }
1477}
1478
1479/// Resolve the GraphQL Union __typename from an ABI type string.
1480fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1481    let fallback = variants.last().map(|v| v.type_name.clone()).unwrap_or_default();
1482    match type_str {
1483        "u8" | "u16" | "u32" | "i32" => {
1484            variants.iter().find(|v| v.field_name == "integer")
1485                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1486        }
1487        "u64" | "u128" | "i64" => {
1488            variants.iter().find(|v| v.field_name == "bigInteger")
1489                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1490        }
1491        "string" => {
1492            variants.iter().find(|v| v.field_name == "string")
1493                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1494        }
1495        "publicKey" | "pubkey" => {
1496            variants.iter().find(|v| v.field_name == "address")
1497                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1498        }
1499        "bool" => {
1500            variants.iter().find(|v| v.field_name == "bool")
1501                .map(|v| v.type_name.clone()).unwrap_or_else(|| fallback.clone())
1502        }
1503        _ => fallback,
1504    }
1505}
1506
1507fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1508    match dt {
1509        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
1510        DimType::Int => TypeRef::named(TypeRef::INT),
1511        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1512        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1513    }
1514}
1515
1516fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1517    match dt {
1518        DimType::String => "StringFilter",
1519        DimType::Int => "IntFilter",
1520        DimType::Float => "FloatFilter",
1521        DimType::DateTime => "DateTimeFilter",
1522        DimType::Bool => "BoolFilter",
1523    }
1524}
1525
1526pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1527    match v {
1528        serde_json::Value::Null => Value::Null,
1529        serde_json::Value::Bool(b) => Value::from(b),
1530        serde_json::Value::Number(n) => {
1531            if let Some(i) = n.as_i64() { Value::from(i) }
1532            else if let Some(f) = n.as_f64() { Value::from(f) }
1533            else { Value::from(n.to_string()) }
1534        }
1535        serde_json::Value::String(s) => Value::from(s),
1536        _ => Value::from(v.to_string()),
1537    }
1538}
1539
1540/// Build a JoinExpr from a JoinDef and target cube definition.
1541/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1542fn build_join_expr(
1543    jd: &crate::cube::definition::JoinDef,
1544    target_cube: &CubeDefinition,
1545    sub_field: &async_graphql::SelectionField<'_>,
1546    network: &str,
1547    join_idx: usize,
1548) -> JoinExpr {
1549    let target_flat = target_cube.flat_dimensions();
1550    let target_table = target_cube.table_for_chain(network);
1551
1552    let mut requested_paths = HashSet::new();
1553    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1554
1555    let mut selects: Vec<SelectExpr> = target_flat.iter()
1556        .filter(|(path, _)| requested_paths.contains(path))
1557        .map(|(_, dim)| SelectExpr::Column {
1558            column: dim.column.clone(),
1559            alias: None,
1560        })
1561        .collect();
1562
1563    if selects.is_empty() {
1564        selects = target_flat.iter()
1565            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1566            .collect();
1567    }
1568
1569    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1570
1571    let group_by = if is_aggregate {
1572        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1573        for sel in &selects {
1574            if let SelectExpr::Column { column, .. } = sel {
1575                if !column.contains('(') && !gb.contains(column) {
1576                    gb.push(column.clone());
1577                }
1578            }
1579        }
1580        gb
1581    } else {
1582        vec![]
1583    };
1584
1585    JoinExpr {
1586        schema: target_cube.schema.clone(),
1587        table: target_table,
1588        alias: format!("_j{}", join_idx),
1589        conditions: jd.conditions.clone(),
1590        selects,
1591        group_by,
1592        use_final: target_cube.use_final,
1593        is_aggregate,
1594        target_cube: jd.target_cube.clone(),
1595        join_field: sub_field.name().to_string(),
1596        join_type: jd.join_type.clone(),
1597    }
1598}
1599
1600/// Convert a ClickHouse DateTime value to ISO 8601 format.
1601/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1602fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1603    match v {
1604        serde_json::Value::String(s) => {
1605            let iso = if s.contains('T') {
1606                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1607            } else {
1608                let replaced = s.replacen(' ', "T", 1);
1609                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1610            };
1611            Value::from(iso)
1612        }
1613        other => json_to_gql_value(other),
1614    }
1615}