Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr, is_aggregate_expr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59    /// Custom name for the network enum type registered in the schema.
60    /// Defaults to `"{name}Network"` (e.g. `EVMNetwork`) when `None`.
61    /// Set to e.g. `Some("evm_network".into())` for Bitquery-compatible naming.
62    pub network_enum_name: Option<String>,
63}
64
65/// Callback to transform the resolved table name based on `ChainContext`.
66/// Called after `resolve_table` picks the base table, before SQL compilation.
67/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
68pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70/// Configuration for supported networks (chains) and optional stats collection.
71pub struct SchemaConfig {
72    pub chain_groups: Vec<ChainGroupConfig>,
73    pub root_query_name: String,
74    /// Optional callback invoked after each cube query with execution metadata.
75    /// Used by application layer for billing, observability, etc.
76    pub stats_callback: Option<StatsCallback>,
77    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
78    pub table_name_transform: Option<TableNameTransform>,
79    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
80    pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84    fn default() -> Self {
85        Self {
86            chain_groups: vec![
87                ChainGroupConfig {
88                    name: "EVM".to_string(),
89                    group: ChainGroup::Evm,
90                    networks: vec!["eth".into(), "bsc".into()],
91                    has_network_arg: true,
92                    extra_args: vec![],
93                    network_enum_name: None,
94                },
95                ChainGroupConfig {
96                    name: "Solana".to_string(),
97                    group: ChainGroup::Solana,
98                    networks: vec!["sol".into()],
99                    has_network_arg: false,
100                    extra_args: vec![],
101                    network_enum_name: None,
102                },
103                ChainGroupConfig {
104                    name: "Trading".to_string(),
105                    group: ChainGroup::Trading,
106                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107                    has_network_arg: false,
108                    extra_args: vec![],
109                    network_enum_name: None,
110                },
111            ],
112            root_query_name: "ChainStream".to_string(),
113            stats_callback: None,
114            table_name_transform: None,
115            extra_types: vec![],
116        }
117    }
118}
119
120/// Stored in the parent value of chain wrapper types so cube resolvers can
121/// retrieve the active chain without a `network` argument on themselves.
122/// Service-layer extra_args values are available in the `extra` map.
123pub struct ChainContext {
124    pub network: String,
125    pub extra: HashMap<String, String>,
126}
127
128/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
129///
130/// Schema shape (Bitquery-aligned):
131/// ```graphql
132/// type Query {
133///   EVM(network: EVMNetwork!): EVM!
134///   Solana: Solana!
135///   Trading: Trading!
136///   _cubeMetadata: String!
137/// }
138/// ```
139pub fn build_schema(
140    registry: CubeRegistry,
141    dialect: Arc<dyn SqlDialect>,
142    executor: QueryExecutor,
143    config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145    let mut builder = Schema::build("Query", None, None);
146
147    // --- shared input / enum types -------------------------------------------
148    builder = builder.register(filter_types::build_limit_input());
149    // LimitByInput is now generated per-cube in build_cube_types() with typed enum for `by`
150    builder = builder.register(
151        Enum::new("OrderDirection")
152            .description("Sort direction")
153            .item(EnumItem::new("ASC").description("Ascending"))
154            .item(EnumItem::new("DESC").description("Descending")),
155    );
156    for input in filter_types::build_filter_primitives() {
157        builder = builder.register(input);
158    }
159    builder = builder.register(
160        Scalar::new("DateTime")
161            .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162    );
163    builder = builder.register(
164        Scalar::new("ISO8601DateTime")
165            .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166    );
167    builder = builder.register(
168        Enum::new("TimeUnit")
169            .description("Time unit for interval bucketing")
170            .item(EnumItem::new("seconds"))
171            .item(EnumItem::new("minutes"))
172            .item(EnumItem::new("hours"))
173            .item(EnumItem::new("days"))
174            .item(EnumItem::new("weeks"))
175            .item(EnumItem::new("months")),
176    );
177    builder = builder.register(
178        InputObject::new("TimeIntervalInput")
179            .description("Time bucketing interval for DateTime dimensions")
180            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182    );
183    builder = builder.register(
184        InputObject::new("DimSelectWhere")
185            .description("Post-aggregation filter for dimension values (HAVING clause)")
186            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192    );
193
194    for extra_enum in config.extra_types {
195        builder = builder.register(extra_enum);
196    }
197
198    // --- per-group network enums ---------------------------------------------
199    for grp in &config.chain_groups {
200        if grp.has_network_arg {
201            let enum_name = grp.network_enum_name.clone()
202                .unwrap_or_else(|| format!("{}Network", grp.name));
203            let mut net_enum = Enum::new(&enum_name)
204                .description(format!("{} network selector", grp.name));
205            for net in &grp.networks {
206                net_enum = net_enum.item(EnumItem::new(net));
207            }
208            builder = builder.register(net_enum);
209        }
210    }
211
212    // --- register cube types once (shared across chain groups) ----------------
213    let mut registered_cubes: HashSet<String> = HashSet::new();
214    for cube in registry.cubes() {
215        if registered_cubes.contains(&cube.name) { continue; }
216        registered_cubes.insert(cube.name.clone());
217
218        let types = build_cube_types(cube);
219        for obj in types.objects { builder = builder.register(obj); }
220        for inp in types.inputs { builder = builder.register(inp); }
221        for en in types.enums { builder = builder.register(en); }
222        for un in types.unions { builder = builder.register(un); }
223    }
224
225    // --- build chain wrapper types + Query fields ----------------------------
226    let mut query = Object::new("Query");
227
228    for grp in &config.chain_groups {
229        let wrapper_type_name = grp.name.clone();
230
231        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
232        // cube fields that belong to this group.
233        let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235        for cube in registry.cubes() {
236            if !cube.chain_groups.contains(&grp.group) { continue; }
237
238            let cube_field = build_cube_field(
239                cube,
240                dialect.clone(),
241                executor.clone(),
242                config.stats_callback.clone(),
243            );
244            wrapper_obj = wrapper_obj.field(cube_field);
245        }
246        builder = builder.register(wrapper_obj);
247
248        // Wrapper resolver on Query — stores ChainContext for child resolvers.
249        let default_network = grp.networks.first().cloned().unwrap_or_default();
250        let has_net_arg = grp.has_network_arg;
251        let net_enum_name = grp.network_enum_name.clone()
252            .unwrap_or_else(|| format!("{}Network", grp.name));
253        let wrapper_type_for_resolver = wrapper_type_name.clone();
254        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256        let mut wrapper_field = Field::new(
257            &wrapper_type_name,
258            TypeRef::named_nn(&wrapper_type_name),
259            move |ctx| {
260                let default = default_network.clone();
261                let has_arg = has_net_arg;
262                let arg_names = extra_arg_names.clone();
263                FieldFuture::new(async move {
264                    let network = if has_arg {
265                        ctx.args.try_get("network")
266                            .ok()
267                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268                            .unwrap_or(default)
269                    } else {
270                        default
271                    };
272                    let mut extra = HashMap::new();
273                    for arg_name in &arg_names {
274                        if let Ok(val) = ctx.args.try_get(arg_name) {
275                            let resolved = val.enum_name().ok().map(|s| s.to_string())
276                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277                                .or_else(|| val.string().ok().map(|s| s.to_string()));
278                            if let Some(v) = resolved {
279                                extra.insert(arg_name.clone(), v);
280                            }
281                        }
282                    }
283                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284                })
285            },
286        );
287
288        if grp.has_network_arg {
289            let net_type_ref = if grp.networks.len() > 1 {
290                TypeRef::named_nn(&net_enum_name) // required when multiple networks
291            } else {
292                TypeRef::named(&net_enum_name) // optional when single network (defaults to first)
293            };
294            wrapper_field = wrapper_field.argument(
295                InputValue::new("network", net_type_ref)
296                    .description(format!("{} network to query", wrapper_type_for_resolver)),
297            );
298        }
299        for wa in &grp.extra_args {
300            wrapper_field = wrapper_field.argument(
301                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
302                    .description(&wa.description),
303            );
304        }
305
306        query = query.field(wrapper_field);
307    }
308
309    // --- _cubeMetadata -------------------------------------------------------
310    let metadata_registry = Arc::new(registry.clone());
311    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
312        .map(|g| (g.name.clone(), g.group.clone()))
313        .collect();
314    let metadata_field = Field::new(
315        "_cubeMetadata",
316        TypeRef::named_nn(TypeRef::STRING),
317        move |_ctx| {
318            let reg = metadata_registry.clone();
319            let groups = metadata_groups.clone();
320            FieldFuture::new(async move {
321                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
322                for (group_name, group_enum) in &groups {
323                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
324                        .filter(|c| c.chain_groups.contains(group_enum))
325                        .map(serialize_cube_metadata)
326                        .collect();
327                    group_metadata.push(serde_json::json!({
328                        "group": group_name,
329                        "cubes": cubes_in_group,
330                    }));
331                }
332                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
333                Ok(Some(FieldValue::value(Value::from(json))))
334            })
335        },
336    )
337    .description("Internal: returns JSON metadata about all cubes grouped by chain");
338    query = query.field(metadata_field);
339
340    builder = builder.register(query);
341    builder = builder.data(registry);
342
343    if let Some(transform) = config.table_name_transform.clone() {
344        builder = builder.data(transform);
345    }
346
347    builder.finish()
348}
349
350fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
351    serde_json::json!({
352        "name": cube.name,
353        "description": cube.description,
354        "schema": cube.schema,
355        "tablePattern": cube.table_pattern,
356        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
357        "metrics": cube.metrics.iter().map(|m| {
358            let mut obj = serde_json::json!({
359                "name": m.name,
360                "returnType": format!("{:?}", m.return_type),
361            });
362            if let Some(ref tmpl) = m.expression_template {
363                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
364            }
365            if let Some(ref desc) = m.description {
366                obj["description"] = serde_json::Value::String(desc.clone());
367            }
368            obj
369        }).collect::<Vec<_>>(),
370        "selectors": cube.selectors.iter().map(|s| {
371            serde_json::json!({
372                "name": s.graphql_name,
373                "column": s.column,
374                "type": format!("{:?}", s.dim_type),
375            })
376        }).collect::<Vec<_>>(),
377        "dimensions": serialize_dims(&cube.dimensions),
378        "joins": cube.joins.iter().map(|j| {
379            serde_json::json!({
380                "field": j.field_name,
381                "target": j.target_cube,
382                "joinType": format!("{:?}", j.join_type),
383            })
384        }).collect::<Vec<_>>(),
385        "tableRoutes": cube.table_routes.iter().map(|r| {
386            serde_json::json!({
387                "schema": r.schema,
388                "tablePattern": r.table_pattern,
389                "availableColumns": r.available_columns,
390                "priority": r.priority,
391            })
392        }).collect::<Vec<_>>(),
393        "defaultLimit": cube.default_limit,
394        "maxLimit": cube.max_limit,
395    })
396}
397
398/// Build a single cube Field that reads `network` from the parent ChainContext.
399fn build_cube_field(
400    cube: &CubeDefinition,
401    dialect: Arc<dyn SqlDialect>,
402    executor: QueryExecutor,
403    stats_cb: Option<StatsCallback>,
404) -> Field {
405    let cube_name = cube.name.clone();
406    let orderby_input_name = format!("{}OrderByInput", cube.name);
407    let cube_description = cube.description.clone();
408
409    let mut field = Field::new(
410        &cube.name,
411        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
412        move |ctx| {
413            let cube_name = cube_name.clone();
414            let dialect = dialect.clone();
415            let executor = executor.clone();
416            let stats_cb = stats_cb.clone();
417            FieldFuture::new(async move {
418                let registry = ctx.ctx.data::<CubeRegistry>()?;
419
420                let chain_ctx = ctx.parent_value
421                    .try_downcast_ref::<ChainContext>().ok();
422                let network = chain_ctx
423                    .map(|c| c.network.as_str())
424                    .unwrap_or("sol");
425
426                let cube_def = registry.get(&cube_name).ok_or_else(|| {
427                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
428                })?;
429
430                let metric_requests = extract_metric_requests(&ctx, cube_def);
431                let quantile_requests = extract_quantile_requests(&ctx);
432                let calculate_requests = extract_calculate_requests(&ctx);
433                let field_aliases = extract_field_aliases(&ctx, cube_def);
434                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
435                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
436                let requested = extract_requested_fields(&ctx, cube_def);
437                let mut ir = compiler::parser::parse_cube_query(
438                    cube_def,
439                    network,
440                    &ctx.args,
441                    &metric_requests,
442                    &quantile_requests,
443                    &calculate_requests,
444                    &field_aliases,
445                    &dim_agg_requests,
446                    &time_intervals,
447                    Some(requested),
448                )?;
449
450                let mut join_idx = 0usize;
451                for sub_field in ctx.ctx.field().selection_set() {
452                    let fname = sub_field.name().to_string();
453                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
454                    if let Some(jd) = join_def {
455                        if let Some(target_cube) = registry.get(&jd.target_cube) {
456                            let join_expr = build_join_expr(
457                                jd, target_cube, &sub_field, network, join_idx,
458                            );
459                            ir.joins.push(join_expr);
460                            join_idx += 1;
461                        }
462                    }
463                }
464
465                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
466                    if let Some(cctx) = chain_ctx {
467                        ir.table = transform(&ir.table, cctx);
468                    }
469                }
470
471                let validated = compiler::validator::validate(ir)?;
472                let result = dialect.compile(&validated);
473                let sql = result.sql;
474                let bindings = result.bindings;
475
476                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
477                    async_graphql::Error::new(format!("Query execution failed: {e}"))
478                })?;
479
480                let rows = if result.alias_remap.is_empty() {
481                    rows
482                } else {
483                    rows.into_iter().map(|mut row| {
484                        for (alias, original) in &result.alias_remap {
485                            if let Some(val) = row.shift_remove(alias) {
486                                row.entry(original.clone()).or_insert(val);
487                            }
488                        }
489                        row
490                    }).collect()
491                };
492
493                let rows: Vec<RowMap> = if validated.joins.is_empty() {
494                    rows
495                } else {
496                    rows.into_iter().map(|mut row| {
497                        for join in &validated.joins {
498                            let prefix = format!("{}.", join.alias);
499                            let mut sub_row = RowMap::new();
500                            let keys: Vec<String> = row.keys()
501                                .filter(|k| k.starts_with(&prefix))
502                                .cloned()
503                                .collect();
504                            for key in keys {
505                                if let Some(val) = row.shift_remove(&key) {
506                                    sub_row.insert(key[prefix.len()..].to_string(), val);
507                                }
508                            }
509                            let obj: serde_json::Map<String, serde_json::Value> =
510                                sub_row.into_iter().collect();
511                            row.insert(
512                                join.join_field.clone(),
513                                serde_json::Value::Object(obj),
514                            );
515                        }
516                        row
517                    }).collect()
518                };
519
520                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
521                    .or_else(|| stats_cb.clone());
522                if let Some(cb) = effective_cb {
523                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
524                    cb(stats);
525                }
526
527                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
528                Ok(Some(FieldValue::list(values)))
529            })
530        },
531    );
532    if !cube_description.is_empty() {
533        field = field.description(&cube_description);
534    }
535    field = field
536        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
537            .description("Filter conditions"))
538        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
539            .description("Pagination control"))
540        .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
541            .description("Per-group row limit"))
542        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
543            .description("Sort order (Bitquery-compatible)"));
544
545    for sel in &cube.selectors {
546        let filter_type = dim_type_to_filter_name(&sel.dim_type);
547        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
548            .description(format!("Shorthand filter for {}", sel.graphql_name)));
549    }
550
551    field
552}
553
554fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
555    serde_json::Value::Array(dims.iter().map(|d| match d {
556        DimensionNode::Leaf(dim) => {
557            let mut obj = serde_json::json!({
558                "name": dim.graphql_name,
559                "column": dim.column,
560                "type": format!("{:?}", dim.dim_type),
561            });
562            if let Some(desc) = &dim.description {
563                obj["description"] = serde_json::Value::String(desc.clone());
564            }
565            obj
566        },
567        DimensionNode::Group { graphql_name, description, children } => {
568            let mut obj = serde_json::json!({
569                "name": graphql_name,
570                "children": serialize_dims(children),
571            });
572            if let Some(desc) = description {
573                obj["description"] = serde_json::Value::String(desc.clone());
574            }
575            obj
576        },
577        DimensionNode::Array { graphql_name, description, children } => {
578            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
579                let type_value = match &f.field_type {
580                    crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
581                        "kind": "scalar",
582                        "scalarType": format!("{:?}", dt),
583                    }),
584                    crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
585                        "kind": "union",
586                        "variants": variants.iter().map(|v| serde_json::json!({
587                            "typeName": v.type_name,
588                            "fieldName": v.field_name,
589                            "sourceType": format!("{:?}", v.source_type),
590                        })).collect::<Vec<_>>(),
591                    }),
592                };
593                let mut field_obj = serde_json::json!({
594                    "name": f.graphql_name,
595                    "column": f.column,
596                    "type": type_value,
597                });
598                if let Some(desc) = &f.description {
599                    field_obj["description"] = serde_json::Value::String(desc.clone());
600                }
601                field_obj
602            }).collect();
603            let mut obj = serde_json::json!({
604                "name": graphql_name,
605                "kind": "array",
606                "fields": fields,
607            });
608            if let Some(desc) = description {
609                obj["description"] = serde_json::Value::String(desc.clone());
610            }
611            obj
612        },
613    }).collect())
614}
615
616/// Extract metric requests from the GraphQL selection set by inspecting
617/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
618/// the "count" field in the selection set and extract its `of` argument.
619fn extract_metric_requests(
620    ctx: &async_graphql::dynamic::ResolverContext,
621    cube: &CubeDefinition,
622) -> Vec<compiler::parser::MetricRequest> {
623    let mut requests = Vec::new();
624
625    for sub_field in ctx.ctx.field().selection_set() {
626        let name = sub_field.name();
627        if !cube.has_metric(name) {
628            continue;
629        }
630
631        let args = match sub_field.arguments() {
632            Ok(args) => args,
633            Err(_) => continue,
634        };
635
636        let of_dimension = args
637            .iter()
638            .find(|(k, _)| k.as_str() == "of")
639            .or_else(|| args.iter().find(|(k, _)| k.as_str() == "distinct"))
640            .and_then(|(_, v)| match v {
641                async_graphql::Value::Enum(e) => Some(e.to_string()),
642                async_graphql::Value::String(s) => Some(s.clone()),
643                _ => None,
644            })
645            .unwrap_or_else(|| "*".to_string());
646
647        let select_where_value = args
648            .iter()
649            .find(|(k, _)| k.as_str() == "selectWhere")
650            .map(|(_, v)| v.clone());
651
652        let condition_filter = args
653            .iter()
654            .find(|(k, _)| k.as_str() == "if")
655            .and_then(|(_, v)| {
656                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
657                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
658            });
659
660        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
661        requests.push(compiler::parser::MetricRequest {
662            function: name.to_string(),
663            alias: gql_alias,
664            of_dimension,
665            select_where_value,
666            condition_filter,
667        });
668    }
669
670    requests
671}
672
673fn extract_requested_fields(
674    ctx: &async_graphql::dynamic::ResolverContext,
675    cube: &CubeDefinition,
676) -> HashSet<String> {
677    let mut fields = HashSet::new();
678    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
679    fields
680}
681
682fn collect_selection_paths(
683    field: &async_graphql::SelectionField<'_>,
684    prefix: &str,
685    out: &mut HashSet<String>,
686    metrics: &[MetricDef],
687) {
688    for sub in field.selection_set() {
689        let name = sub.name();
690        if metrics.iter().any(|m| m.name == name) {
691            continue;
692        }
693        let path = if prefix.is_empty() {
694            name.to_string()
695        } else {
696            format!("{prefix}_{name}")
697        };
698        let has_children = sub.selection_set().next().is_some();
699        if has_children {
700            collect_selection_paths(&sub, &path, out, metrics);
701        } else {
702            out.insert(path);
703        }
704    }
705}
706
707/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
708/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
709pub type FieldAliasMap = Vec<(String, String)>;
710
711/// Describes a `quantile(of: ..., level: ...)` request.
712pub struct QuantileRequest {
713    pub alias: String,
714    pub of_dimension: String,
715    pub level: f64,
716}
717
718/// Describes a `calculate(expression: "...")` request.
719pub struct CalculateRequest {
720    pub alias: String,
721    pub expression: String,
722}
723
724/// Describes a time interval bucketing request.
725/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
726pub struct TimeIntervalRequest {
727    pub field_path: String,
728    pub graphql_alias: String,
729    pub column: String,
730    pub unit: String,
731    pub count: i64,
732}
733
734/// Describes a dimension-level aggregation request extracted from the selection set.
735/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
736pub struct DimAggRequest {
737    pub field_path: String,
738    pub graphql_alias: String,
739    pub value_column: String,
740    pub agg_type: DimAggType,
741    pub compare_column: String,
742    pub condition_filter: Option<FilterNode>,
743    pub select_where_value: Option<async_graphql::Value>,
744}
745
746fn extract_quantile_requests(
747    ctx: &async_graphql::dynamic::ResolverContext,
748) -> Vec<QuantileRequest> {
749    let mut requests = Vec::new();
750    for sub_field in ctx.ctx.field().selection_set() {
751        if sub_field.name() != "quantile" { continue; }
752        let args = match sub_field.arguments() {
753            Ok(a) => a,
754            Err(_) => continue,
755        };
756        let of_dim = args.iter()
757            .find(|(k, _)| k.as_str() == "of")
758            .and_then(|(_, v)| match v {
759                async_graphql::Value::Enum(e) => Some(e.to_string()),
760                async_graphql::Value::String(s) => Some(s.clone()),
761                _ => None,
762            })
763            .unwrap_or_else(|| "*".to_string());
764        let level = args.iter()
765            .find(|(k, _)| k.as_str() == "level")
766            .and_then(|(_, v)| match v {
767                async_graphql::Value::Number(n) => n.as_f64(),
768                _ => None,
769            })
770            .unwrap_or(0.5);
771        let alias = sub_field.alias().unwrap_or("quantile").to_string();
772        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
773    }
774    requests
775}
776
777fn extract_field_aliases(
778    ctx: &async_graphql::dynamic::ResolverContext,
779    cube: &CubeDefinition,
780) -> FieldAliasMap {
781    let flat = cube.flat_dimensions();
782    let mut aliases = Vec::new();
783    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
784    aliases
785}
786
787fn collect_field_aliases(
788    field: &async_graphql::SelectionField<'_>,
789    prefix: &str,
790    flat: &[(String, crate::cube::definition::Dimension)],
791    metrics: &[MetricDef],
792    out: &mut FieldAliasMap,
793) {
794    for sub in field.selection_set() {
795        let name = sub.name();
796        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
797            continue;
798        }
799        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
800        let has_children = sub.selection_set().next().is_some();
801        if has_children {
802            collect_field_aliases(&sub, &path, flat, metrics, out);
803        } else if let Some(alias) = sub.alias() {
804            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
805                let alias_path = if prefix.is_empty() {
806                    alias.to_string()
807                } else {
808                    format!("{prefix}_{alias}")
809                };
810                out.push((alias_path, dim.column.clone()));
811            }
812        }
813    }
814}
815
816fn extract_calculate_requests(
817    ctx: &async_graphql::dynamic::ResolverContext,
818) -> Vec<CalculateRequest> {
819    let mut requests = Vec::new();
820    for sub_field in ctx.ctx.field().selection_set() {
821        if sub_field.name() != "calculate" { continue; }
822        let args = match sub_field.arguments() {
823            Ok(a) => a,
824            Err(_) => continue,
825        };
826        let expression = args.iter()
827            .find(|(k, _)| k.as_str() == "expression")
828            .and_then(|(_, v)| match v {
829                async_graphql::Value::String(s) => Some(s.clone()),
830                _ => None,
831            });
832        if let Some(expr) = expression {
833            let alias = sub_field.alias().unwrap_or("calculate").to_string();
834            requests.push(CalculateRequest { alias, expression: expr });
835        }
836    }
837    requests
838}
839
840fn extract_dim_agg_requests(
841    ctx: &async_graphql::dynamic::ResolverContext,
842    cube: &CubeDefinition,
843) -> Vec<DimAggRequest> {
844    let flat = cube.flat_dimensions();
845    let mut requests = Vec::new();
846    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
847    requests
848}
849
850fn collect_dim_agg_paths(
851    field: &async_graphql::SelectionField<'_>,
852    prefix: &str,
853    flat: &[(String, crate::cube::definition::Dimension)],
854    metrics: &[MetricDef],
855    cube: &CubeDefinition,
856    out: &mut Vec<DimAggRequest>,
857) {
858    for sub in field.selection_set() {
859        let name = sub.name();
860        if metrics.iter().any(|m| m.name == name) {
861            continue;
862        }
863        let path = if prefix.is_empty() {
864            name.to_string()
865        } else {
866            format!("{prefix}_{name}")
867        };
868        let has_children = sub.selection_set().next().is_some();
869        if has_children {
870            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
871        } else {
872            let args = match sub.arguments() {
873                Ok(a) => a,
874                Err(_) => continue,
875            };
876            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
877            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
878
879            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
880                let cp = match v {
881                    async_graphql::Value::Enum(e) => e.to_string(),
882                    async_graphql::Value::String(s) => s.clone(),
883                    _ => continue,
884                };
885                (DimAggType::ArgMax, cp)
886            } else if let Some((_, v)) = min_val {
887                let cp = match v {
888                    async_graphql::Value::Enum(e) => e.to_string(),
889                    async_graphql::Value::String(s) => s.clone(),
890                    _ => continue,
891                };
892                (DimAggType::ArgMin, cp)
893            } else {
894                continue;
895            };
896
897            let value_column = flat.iter()
898                .find(|(p, _)| p == &path)
899                .map(|(_, dim)| dim.column.clone());
900            let compare_column = flat.iter()
901                .find(|(p, _)| p == &compare_path)
902                .map(|(_, dim)| dim.column.clone());
903
904            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
905                let condition_filter = args.iter()
906                    .find(|(k, _)| k.as_str() == "if")
907                    .and_then(|(_, v)| {
908                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
909                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
910                    });
911                let select_where_value = args.iter()
912                    .find(|(k, _)| k.as_str() == "selectWhere")
913                    .map(|(_, v)| v.clone());
914
915                let gql_alias = sub.alias()
916                    .map(|a| a.to_string())
917                    .unwrap_or_else(|| path.clone());
918                out.push(DimAggRequest {
919                    field_path: path,
920                    graphql_alias: gql_alias,
921                    value_column: vc,
922                    agg_type,
923                    compare_column: cc,
924                    condition_filter,
925                    select_where_value,
926                });
927            }
928        }
929    }
930}
931
932fn extract_time_interval_requests(
933    ctx: &async_graphql::dynamic::ResolverContext,
934    cube: &CubeDefinition,
935) -> Vec<TimeIntervalRequest> {
936    let flat = cube.flat_dimensions();
937    let mut requests = Vec::new();
938    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
939    requests
940}
941
942fn collect_time_interval_paths(
943    field: &async_graphql::SelectionField<'_>,
944    prefix: &str,
945    flat: &[(String, crate::cube::definition::Dimension)],
946    metrics: &[MetricDef],
947    out: &mut Vec<TimeIntervalRequest>,
948) {
949    for sub in field.selection_set() {
950        let name = sub.name();
951        if metrics.iter().any(|m| m.name == name) { continue; }
952        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
953        let has_children = sub.selection_set().next().is_some();
954        if has_children {
955            collect_time_interval_paths(&sub, &path, flat, metrics, out);
956        } else {
957            let args = match sub.arguments() {
958                Ok(a) => a,
959                Err(_) => continue,
960            };
961            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
962            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
963                let unit = obj.get("in")
964                    .and_then(|v| match v {
965                        async_graphql::Value::Enum(e) => Some(e.to_string()),
966                        async_graphql::Value::String(s) => Some(s.clone()),
967                        _ => None,
968                    });
969                let count = obj.get("count")
970                    .and_then(|v| match v {
971                        async_graphql::Value::Number(n) => n.as_i64(),
972                        _ => None,
973                    });
974                if let (Some(unit), Some(count)) = (unit, count) {
975                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
976                        let gql_alias = sub.alias().unwrap_or(name).to_string();
977                        out.push(TimeIntervalRequest {
978                            field_path: path,
979                            graphql_alias: gql_alias,
980                            column: dim.column.clone(),
981                            unit,
982                            count,
983                        });
984                    }
985                }
986            }
987        }
988    }
989}
990
991// ---------------------------------------------------------------------------
992// Per-Cube GraphQL type generation
993// ---------------------------------------------------------------------------
994
995struct CubeTypes {
996    objects: Vec<Object>,
997    inputs: Vec<InputObject>,
998    enums: Vec<Enum>,
999    unions: Vec<Union>,
1000}
1001
1002fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
1003    let record_name = format!("{}Record", cube.name);
1004    let filter_name = format!("{}Filter", cube.name);
1005    let compare_enum_name = format!("{}CompareFields", cube.name);
1006
1007    let flat_dims = cube.flat_dimensions();
1008
1009    let mut compare_enum = Enum::new(&compare_enum_name)
1010        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1011    for (path, _) in &flat_dims {
1012        compare_enum = compare_enum.item(EnumItem::new(path));
1013    }
1014
1015    let mut record_fields: Vec<Field> = Vec::new();
1016    let mut filter_fields: Vec<InputValue> = Vec::new();
1017    let mut extra_objects: Vec<Object> = Vec::new();
1018    let mut extra_inputs: Vec<InputObject> = Vec::new();
1019    let mut extra_unions: Vec<Union> = Vec::new();
1020
1021    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1022        .description("OR combinator — matches if any sub-filter matches"));
1023
1024    {
1025        let mut collector = DimCollector {
1026            cube_name: &cube.name,
1027            compare_enum_name: &compare_enum_name,
1028            filter_name: &filter_name,
1029            record_fields: &mut record_fields,
1030            filter_fields: &mut filter_fields,
1031            extra_objects: &mut extra_objects,
1032            extra_inputs: &mut extra_inputs,
1033            extra_unions: &mut extra_unions,
1034        };
1035        for node in &cube.dimensions {
1036            collect_dimension_types(node, "", &mut collector);
1037        }
1038    }
1039
1040    let mut metric_enums: Vec<Enum> = Vec::new();
1041    let builtin_descs: std::collections::HashMap<&str, &str> = [
1042        ("count", "Count of rows or distinct values"),
1043        ("sum", "Sum of values"),
1044        ("avg", "Average of values"),
1045        ("min", "Minimum value"),
1046        ("max", "Maximum value"),
1047        ("uniq", "Count of unique (distinct) values"),
1048    ].into_iter().collect();
1049
1050    for metric in &cube.metrics {
1051        let metric_name = &metric.name;
1052        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1053
1054        if metric.supports_where {
1055            extra_inputs.push(
1056                InputObject::new(&select_where_name)
1057                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1058                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1059                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1060                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1061                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1062                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1063            );
1064        }
1065
1066        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1067        let mut of_enum = Enum::new(&of_enum_name)
1068            .description(format!("Dimension to apply {} aggregation on", metric_name));
1069        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1070        metric_enums.push(of_enum);
1071
1072        let metric_name_clone = metric_name.clone();
1073        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1074        let metric_desc = metric.description.as_deref()
1075            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1076            .unwrap_or("Aggregate metric");
1077
1078        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1079            let default_name = metric_name_clone.clone();
1080            FieldFuture::new(async move {
1081                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1082                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1083                let key = metric_key(alias);
1084                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1085                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1086            })
1087        })
1088        .description(metric_desc)
1089        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1090            .description("Dimension to aggregate on (default: all rows)"));
1091
1092        if metric_name == "count" {
1093            metric_field = metric_field
1094                .argument(InputValue::new("distinct", TypeRef::named(&of_enum_name))
1095                    .description("Count distinct values of this dimension (alias for of, maps to uniqExact)"));
1096        }
1097
1098        if metric.supports_where {
1099            metric_field = metric_field
1100                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1101                    .description("Post-aggregation filter (HAVING)"))
1102                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1103                    .description("Conditional filter for this metric"));
1104        }
1105
1106        record_fields.push(metric_field);
1107    }
1108
1109    // `quantile(of: ..., level: ...)` — percentile computation
1110    {
1111        let of_enum_name = format!("{}_quantile_Of", cube.name);
1112        let mut of_enum = Enum::new(&of_enum_name)
1113            .description(format!("Dimension to apply quantile on for {}", cube.name));
1114        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1115        metric_enums.push(of_enum);
1116
1117        let of_enum_for_closure = of_enum_name.clone();
1118        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1119            FieldFuture::new(async move {
1120                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1121                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1122                let key = metric_key(alias);
1123                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1124                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1125            })
1126        })
1127        .description("Compute a quantile (percentile) of a dimension")
1128        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1129            .description("Dimension to compute quantile on"))
1130        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1131            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1132        record_fields.push(quantile_field);
1133    }
1134
1135    // `calculate(expression: "...")` — runtime expression computation
1136    {
1137        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1138            FieldFuture::new(async move {
1139                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1140                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1141                let key = metric_key(alias);
1142                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1143                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1144            })
1145        })
1146        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1147        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1148            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1149        record_fields.push(calculate_field);
1150    }
1151
1152    // Add join fields: joinXxx returns the target cube's Record type
1153    for jd in &cube.joins {
1154        let target_record_name = format!("{}Record", jd.target_cube);
1155        let field_name_owned = jd.field_name.clone();
1156        let mut join_field = Field::new(
1157            &jd.field_name,
1158            TypeRef::named(&target_record_name),
1159            move |ctx| {
1160                let field_name = field_name_owned.clone();
1161                FieldFuture::new(async move {
1162                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1163                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1164                        let sub_row: RowMap = obj.iter()
1165                            .map(|(k, v)| (k.clone(), v.clone()))
1166                            .collect();
1167                        Ok(Some(FieldValue::owned_any(sub_row)))
1168                    } else {
1169                        Ok(Some(FieldValue::value(Value::Null)))
1170                    }
1171                })
1172            },
1173        );
1174        if let Some(desc) = &jd.description {
1175            join_field = join_field.description(desc);
1176        }
1177        record_fields.push(join_field);
1178    }
1179
1180    let mut record = Object::new(&record_name);
1181    for f in record_fields { record = record.field(f); }
1182
1183    let mut filter = InputObject::new(&filter_name)
1184        .description(format!("Filter conditions for {} query", cube.name));
1185    for f in filter_fields { filter = filter.field(f); }
1186
1187    let orderby_input_name = format!("{}OrderByInput", cube.name);
1188    let orderby_input = InputObject::new(&orderby_input_name)
1189        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1190        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1191            .description("Sort descending by this field"))
1192        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1193            .description("Sort ascending by this field"))
1194        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1195            .description("Sort descending by computed/aggregated field name"))
1196        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1197            .description("Sort ascending by computed/aggregated field name"));
1198
1199    let limitby_input_name = format!("{}LimitByInput", cube.name);
1200    let limitby_input = InputObject::new(&limitby_input_name)
1201        .description(format!("Per-group row limit for {}", cube.name))
1202        .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1203            .description("Dimension field to group by"))
1204        .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1205            .description("Maximum rows per group"))
1206        .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1207            .description("Rows to skip per group"));
1208
1209    let mut objects = vec![record]; objects.extend(extra_objects);
1210    let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1211    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1212
1213    CubeTypes { objects, inputs, enums, unions: extra_unions }
1214}
1215
1216struct DimCollector<'a> {
1217    cube_name: &'a str,
1218    compare_enum_name: &'a str,
1219    filter_name: &'a str,
1220    record_fields: &'a mut Vec<Field>,
1221    filter_fields: &'a mut Vec<InputValue>,
1222    extra_objects: &'a mut Vec<Object>,
1223    extra_inputs: &'a mut Vec<InputObject>,
1224    extra_unions: &'a mut Vec<Union>,
1225}
1226
1227fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1228    match node {
1229        DimensionNode::Leaf(dim) => {
1230            let col = dim.column.clone();
1231            let is_datetime = dim.dim_type == DimType::DateTime;
1232            let compare_enum = c.compare_enum_name.to_string();
1233            let cube_filter = c.filter_name.to_string();
1234            let mut leaf_field = Field::new(
1235                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1236                move |ctx| {
1237                    let col = col.clone();
1238                    FieldFuture::new(async move {
1239                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1240                        let has_interval = ctx.args.try_get("interval").is_ok();
1241                        let has_max = ctx.args.try_get("maximum").is_ok();
1242                        let has_min = ctx.args.try_get("minimum").is_ok();
1243                        let key = if has_interval || has_max || has_min {
1244                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1245                            dim_agg_key(name)
1246                        } else {
1247                            col.clone()
1248                        };
1249                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1250                        let gql_val = if is_datetime {
1251                            json_to_gql_datetime(val)
1252                        } else {
1253                            json_to_gql_value(val)
1254                        };
1255                        Ok(Some(FieldValue::value(gql_val)))
1256                    })
1257                },
1258            );
1259            if let Some(desc) = &dim.description {
1260                leaf_field = leaf_field.description(desc);
1261            }
1262            leaf_field = leaf_field
1263                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1264                    .description("Return value from row where compare field is maximum (argMax)"))
1265                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1266                    .description("Return value from row where compare field is minimum (argMin)"))
1267                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1268                    .description("Conditional filter for aggregation"))
1269                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1270                    .description("Post-aggregation value filter (HAVING)"));
1271            if is_datetime {
1272                leaf_field = leaf_field
1273                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1274                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1275            }
1276            // Update resolver key: if interval is present, read from the interval expression key
1277            // This is handled by the resolver checking ctx.args for "interval"
1278            c.record_fields.push(leaf_field);
1279            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1280        }
1281        DimensionNode::Group { graphql_name, description, children } => {
1282            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1283            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1284            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1285
1286            let mut child_record_fields: Vec<Field> = Vec::new();
1287            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1288            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1289
1290            let mut child_collector = DimCollector {
1291                cube_name: c.cube_name,
1292                compare_enum_name: c.compare_enum_name,
1293                filter_name: c.filter_name,
1294                record_fields: &mut child_record_fields,
1295                filter_fields: &mut child_filter_fields,
1296                extra_objects: c.extra_objects,
1297                extra_inputs: c.extra_inputs,
1298                extra_unions: c.extra_unions,
1299            };
1300            for child in children {
1301                collect_dimension_types(child, &new_prefix, &mut child_collector);
1302            }
1303
1304            let mut nested_record = Object::new(&nested_record_name);
1305            for f in child_record_fields { nested_record = nested_record.field(f); }
1306
1307            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1308            let mut nested_filter = InputObject::new(&nested_filter_name)
1309                .description(nested_filter_desc);
1310            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1311
1312            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1313                FieldFuture::new(async move {
1314                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1315                    Ok(Some(FieldValue::owned_any(row.clone())))
1316                })
1317            });
1318            if let Some(desc) = description {
1319                group_field = group_field.description(desc);
1320            }
1321            c.record_fields.push(group_field);
1322            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1323            c.extra_objects.push(nested_record);
1324            c.extra_inputs.push(nested_filter);
1325        }
1326        DimensionNode::Array { graphql_name, description, children } => {
1327            let full_path = if prefix.is_empty() {
1328                graphql_name.clone()
1329            } else {
1330                format!("{prefix}_{graphql_name}")
1331            };
1332            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1333            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1334
1335            let mut element_obj = Object::new(&element_type_name);
1336            let mut includes_filter = InputObject::new(&includes_filter_name)
1337                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1338
1339            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1340
1341            for child in children {
1342                match &child.field_type {
1343                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1344                        let col_name = child.column.clone();
1345                        let is_datetime = *dt == DimType::DateTime;
1346                        let mut field = Field::new(
1347                            &child.graphql_name,
1348                            dim_type_to_typeref(dt),
1349                            move |ctx| {
1350                                let col = col_name.clone();
1351                                FieldFuture::new(async move {
1352                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1353                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1354                                    let gql_val = if is_datetime {
1355                                        json_to_gql_datetime(val)
1356                                    } else {
1357                                        json_to_gql_value(val)
1358                                    };
1359                                    Ok(Some(FieldValue::value(gql_val)))
1360                                })
1361                            },
1362                        );
1363                        if let Some(desc) = &child.description {
1364                            field = field.description(desc);
1365                        }
1366                        element_obj = element_obj.field(field);
1367                        includes_filter = includes_filter.field(
1368                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1369                        );
1370                    }
1371                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1372                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1373
1374                        let mut gql_union = Union::new(&union_name);
1375                        let mut variant_objects = Vec::new();
1376
1377                        for v in variants {
1378                            let variant_obj_name = v.type_name.clone();
1379                            let field_name = v.field_name.clone();
1380                            let source_type = v.source_type.clone();
1381                            let type_ref = dim_type_to_typeref(&source_type);
1382
1383                            let val_col = child.column.clone();
1384                            let variant_obj = Object::new(&variant_obj_name)
1385                                .field(Field::new(
1386                                    &field_name,
1387                                    type_ref,
1388                                    move |ctx| {
1389                                        let col = val_col.clone();
1390                                        FieldFuture::new(async move {
1391                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1392                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1393                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1394                                        })
1395                                    },
1396                                ));
1397                            gql_union = gql_union.possible_type(&variant_obj_name);
1398                            variant_objects.push(variant_obj);
1399                        }
1400
1401                        let col_name = child.column.clone();
1402                        let type_col = children.iter()
1403                            .find(|f| f.graphql_name == "Type")
1404                            .map(|f| f.column.clone())
1405                            .unwrap_or_default();
1406                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1407
1408                        let mut field = Field::new(
1409                            &child.graphql_name,
1410                            TypeRef::named(&union_name),
1411                            move |ctx| {
1412                                let col = col_name.clone();
1413                                let tcol = type_col.clone();
1414                                let vars = variants_clone.clone();
1415                                FieldFuture::new(async move {
1416                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1417                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1418                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1419                                        return Ok(None);
1420                                    }
1421                                    let type_str = row.get(&tcol)
1422                                        .and_then(|v| v.as_str())
1423                                        .unwrap_or("");
1424                                    let resolved_type = resolve_union_typename(type_str, &vars);
1425                                    let mut elem = RowMap::new();
1426                                    elem.insert(col.clone(), raw_val);
1427                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1428                                })
1429                            },
1430                        );
1431                        if let Some(desc) = &child.description {
1432                            field = field.description(desc);
1433                        }
1434                        element_obj = element_obj.field(field);
1435
1436                        includes_filter = includes_filter.field(
1437                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1438                        );
1439
1440                        union_registrations.push((union_name, gql_union, variant_objects));
1441                    }
1442                }
1443            }
1444
1445            // Register union types and variant objects
1446            for (_, union_type, variant_objs) in union_registrations {
1447                c.extra_objects.extend(variant_objs);
1448                // Unions need to be registered differently — store in extra_objects
1449                // by wrapping in a dummy. Actually, we need to register unions via builder.
1450                // For now, we'll store them in a new field. Let's use the enums vec hack:
1451                // Actually, async-graphql dynamic schema registers unions the same way.
1452                // We need to pass them up. Let's add a unions vec to DimCollector.
1453                // For now, store union as Object (it won't work). We need to refactor.
1454                // Let me add unions to CubeTypes.
1455                c.extra_unions.push(union_type);
1456            }
1457
1458            // Build the list resolver: zip parallel array columns into element objects
1459            let child_columns: Vec<(String, String)> = children.iter()
1460                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1461                .collect();
1462            let element_type_name_clone = element_type_name.clone();
1463
1464            let mut array_field = Field::new(
1465                graphql_name,
1466                TypeRef::named_nn_list_nn(&element_type_name),
1467                move |ctx| {
1468                    let cols = child_columns.clone();
1469                    let _etype = element_type_name_clone.clone();
1470                    FieldFuture::new(async move {
1471                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1472                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1473                            .map(|(gql_name, col)| {
1474                                let arr = row.get(col)
1475                                    .and_then(|v| v.as_array())
1476                                    .cloned()
1477                                    .unwrap_or_default();
1478                                (gql_name.as_str(), arr)
1479                            })
1480                            .collect();
1481
1482                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1483                        let mut elements = Vec::with_capacity(len);
1484                        for i in 0..len {
1485                            let mut elem = RowMap::new();
1486                            for (gql_name, arr) in &arrays {
1487                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1488                                elem.insert(gql_name.to_string(), val);
1489                            }
1490                            // Also store by column name for Union resolvers
1491                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1492                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1493                                elem.insert(col.clone(), val);
1494                            }
1495                            elements.push(FieldValue::owned_any(elem));
1496                        }
1497                        Ok(Some(FieldValue::list(elements)))
1498                    })
1499                },
1500            );
1501            if let Some(desc) = description {
1502                array_field = array_field.description(desc);
1503            }
1504            c.record_fields.push(array_field);
1505
1506            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1507            // parser can find the `includes` key (compiler/filter.rs expects
1508            // `{ includes: [{ Name: { is: "..." } }] }`).
1509            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1510            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1511                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1512                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1513                    .description("Match rows where at least one array element satisfies all conditions"));
1514            c.extra_inputs.push(wrapper_filter);
1515
1516            c.filter_fields.push(InputValue::new(
1517                graphql_name,
1518                TypeRef::named(&wrapper_filter_name),
1519            ));
1520
1521            c.extra_objects.push(element_obj);
1522            c.extra_inputs.push(includes_filter);
1523        }
1524    }
1525}
1526
1527/// Resolve the GraphQL Union __typename from a discriminator column value.
1528///
1529/// Walks variants in order; the first whose `source_type_names` contains
1530/// `type_str` wins.  If none match, the last variant is used as fallback
1531/// (conventionally the "catch-all" variant such as JSON).
1532fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1533    for v in variants {
1534        if v.source_type_names.iter().any(|s| s == type_str) {
1535            return v.type_name.clone();
1536        }
1537    }
1538    variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1539}
1540
1541fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1542    match dt {
1543        DimType::String | DimType::Decimal | DimType::Date => TypeRef::named(TypeRef::STRING),
1544        DimType::DateTime => TypeRef::named("DateTime"),
1545        DimType::Int => TypeRef::named(TypeRef::INT),
1546        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1547        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1548    }
1549}
1550
1551fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1552    match dt {
1553        DimType::String => "StringFilter",
1554        DimType::Int => "IntFilter",
1555        DimType::Float => "FloatFilter",
1556        DimType::Decimal => "DecimalFilter",
1557        DimType::Date => "DateFilter",
1558        DimType::DateTime => "DateTimeFilter",
1559        DimType::Bool => "BoolFilter",
1560    }
1561}
1562
1563pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1564    match v {
1565        serde_json::Value::Null => Value::Null,
1566        serde_json::Value::Bool(b) => Value::from(b),
1567        serde_json::Value::Number(n) => {
1568            if let Some(i) = n.as_i64() { Value::from(i) }
1569            else if let Some(f) = n.as_f64() { Value::from(f) }
1570            else { Value::from(n.to_string()) }
1571        }
1572        serde_json::Value::String(s) => Value::from(s),
1573        _ => Value::from(v.to_string()),
1574    }
1575}
1576
1577/// Build a JoinExpr from a JoinDef and target cube definition.
1578/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1579fn build_join_expr(
1580    jd: &crate::cube::definition::JoinDef,
1581    target_cube: &CubeDefinition,
1582    sub_field: &async_graphql::SelectionField<'_>,
1583    network: &str,
1584    join_idx: usize,
1585) -> JoinExpr {
1586    let target_flat = target_cube.flat_dimensions();
1587    let target_table = target_cube.table_for_chain(network);
1588
1589    let mut requested_paths = HashSet::new();
1590    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1591
1592    let mut selects: Vec<SelectExpr> = target_flat.iter()
1593        .filter(|(path, _)| requested_paths.contains(path))
1594        .map(|(_, dim)| SelectExpr::Column {
1595            column: dim.column.clone(),
1596            alias: None,
1597        })
1598        .collect();
1599
1600    if selects.is_empty() {
1601        selects = target_flat.iter()
1602            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1603            .collect();
1604    }
1605
1606    let is_aggregate = target_flat.iter().any(|(_, dim)| is_aggregate_expr(&dim.column));
1607
1608    let group_by = if is_aggregate {
1609        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1610        for sel in &selects {
1611            if let SelectExpr::Column { column, .. } = sel {
1612                if !is_aggregate_expr(column) && !gb.contains(column) {
1613                    gb.push(column.clone());
1614                }
1615            }
1616        }
1617        gb
1618    } else {
1619        vec![]
1620    };
1621
1622    JoinExpr {
1623        schema: target_cube.schema.clone(),
1624        table: target_table,
1625        alias: format!("_j{}", join_idx),
1626        conditions: jd.conditions.clone(),
1627        selects,
1628        group_by,
1629        use_final: target_cube.use_final,
1630        is_aggregate,
1631        target_cube: jd.target_cube.clone(),
1632        join_field: sub_field.name().to_string(),
1633        join_type: jd.join_type.clone(),
1634    }
1635}
1636
1637/// Convert a ClickHouse DateTime value to ISO 8601 format.
1638/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1639fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1640    match v {
1641        serde_json::Value::String(s) => {
1642            let iso = if s.contains('T') {
1643                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1644            } else {
1645                let replaced = s.replacen(' ', "T", 1);
1646                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1647            };
1648            Value::from(iso)
1649        }
1650        other => json_to_gql_value(other),
1651    }
1652}