Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr, is_aggregate_expr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59    /// Custom name for the network enum type registered in the schema.
60    /// Defaults to `"{name}Network"` (e.g. `EVMNetwork`) when `None`.
61    /// Set to e.g. `Some("evm_network".into())` for Bitquery-compatible naming.
62    pub network_enum_name: Option<String>,
63}
64
65/// Callback to transform the resolved table name based on `ChainContext`.
66/// Called after `resolve_table` picks the base table, before SQL compilation.
67/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
68pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70/// Configuration for supported networks (chains) and optional stats collection.
71pub struct SchemaConfig {
72    pub chain_groups: Vec<ChainGroupConfig>,
73    pub root_query_name: String,
74    /// Optional callback invoked after each cube query with execution metadata.
75    /// Used by application layer for billing, observability, etc.
76    pub stats_callback: Option<StatsCallback>,
77    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
78    pub table_name_transform: Option<TableNameTransform>,
79    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
80    pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84    fn default() -> Self {
85        Self {
86            chain_groups: vec![
87                ChainGroupConfig {
88                    name: "EVM".to_string(),
89                    group: ChainGroup::Evm,
90                    networks: vec!["eth".into(), "bsc".into()],
91                    has_network_arg: true,
92                    extra_args: vec![],
93                    network_enum_name: None,
94                },
95                ChainGroupConfig {
96                    name: "Solana".to_string(),
97                    group: ChainGroup::Solana,
98                    networks: vec!["sol".into()],
99                    has_network_arg: false,
100                    extra_args: vec![],
101                    network_enum_name: None,
102                },
103                ChainGroupConfig {
104                    name: "Trading".to_string(),
105                    group: ChainGroup::Trading,
106                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107                    has_network_arg: false,
108                    extra_args: vec![],
109                    network_enum_name: None,
110                },
111            ],
112            root_query_name: "ChainStream".to_string(),
113            stats_callback: None,
114            table_name_transform: None,
115            extra_types: vec![],
116        }
117    }
118}
119
120/// Stored in the parent value of chain wrapper types so cube resolvers can
121/// retrieve the active chain without a `network` argument on themselves.
122/// Service-layer extra_args values are available in the `extra` map.
123pub struct ChainContext {
124    pub network: String,
125    pub extra: HashMap<String, String>,
126}
127
128/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
129///
130/// Schema shape (Bitquery-aligned):
131/// ```graphql
132/// type Query {
133///   EVM(network: EVMNetwork!): EVM!
134///   Solana: Solana!
135///   Trading: Trading!
136///   _cubeMetadata: String!
137/// }
138/// ```
139pub fn build_schema(
140    registry: CubeRegistry,
141    dialect: Arc<dyn SqlDialect>,
142    executor: QueryExecutor,
143    config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145    let mut builder = Schema::build("Query", None, None);
146
147    // --- shared input / enum types -------------------------------------------
148    builder = builder.register(filter_types::build_limit_input());
149    // LimitByInput is now generated per-cube in build_cube_types() with typed enum for `by`
150    builder = builder.register(
151        Enum::new("OrderDirection")
152            .description("Sort direction")
153            .item(EnumItem::new("ASC").description("Ascending"))
154            .item(EnumItem::new("DESC").description("Descending")),
155    );
156    for input in filter_types::build_filter_primitives() {
157        builder = builder.register(input);
158    }
159    builder = builder.register(
160        Scalar::new("DateTime")
161            .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162    );
163    builder = builder.register(
164        Scalar::new("ISO8601DateTime")
165            .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166    );
167    builder = builder.register(
168        Enum::new("TimeUnit")
169            .description("Time unit for interval bucketing")
170            .item(EnumItem::new("seconds"))
171            .item(EnumItem::new("minutes"))
172            .item(EnumItem::new("hours"))
173            .item(EnumItem::new("days"))
174            .item(EnumItem::new("weeks"))
175            .item(EnumItem::new("months")),
176    );
177    builder = builder.register(
178        InputObject::new("TimeIntervalInput")
179            .description("Time bucketing interval for DateTime dimensions")
180            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182    );
183    builder = builder.register(
184        InputObject::new("DimSelectWhere")
185            .description("Post-aggregation filter for dimension values (HAVING clause)")
186            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192    );
193
194    for extra_enum in config.extra_types {
195        builder = builder.register(extra_enum);
196    }
197
198    // --- per-group network enums ---------------------------------------------
199    for grp in &config.chain_groups {
200        if grp.has_network_arg {
201            let enum_name = grp.network_enum_name.clone()
202                .unwrap_or_else(|| format!("{}Network", grp.name));
203            let mut net_enum = Enum::new(&enum_name)
204                .description(format!("{} network selector", grp.name));
205            for net in &grp.networks {
206                net_enum = net_enum.item(EnumItem::new(net));
207            }
208            builder = builder.register(net_enum);
209        }
210    }
211
212    // --- register cube types once (shared across chain groups) ----------------
213    let mut registered_cubes: HashSet<String> = HashSet::new();
214    for cube in registry.cubes() {
215        if registered_cubes.contains(&cube.name) { continue; }
216        registered_cubes.insert(cube.name.clone());
217
218        let types = build_cube_types(cube);
219        for obj in types.objects { builder = builder.register(obj); }
220        for inp in types.inputs { builder = builder.register(inp); }
221        for en in types.enums { builder = builder.register(en); }
222        for un in types.unions { builder = builder.register(un); }
223    }
224
225    // --- build chain wrapper types + Query fields ----------------------------
226    let mut query = Object::new("Query");
227
228    for grp in &config.chain_groups {
229        let wrapper_type_name = grp.name.clone();
230
231        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
232        // cube fields that belong to this group.
233        let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235        for cube in registry.cubes() {
236            if !cube.chain_groups.contains(&grp.group) { continue; }
237
238            let cube_field = build_cube_field(
239                cube,
240                dialect.clone(),
241                executor.clone(),
242                config.stats_callback.clone(),
243            );
244            wrapper_obj = wrapper_obj.field(cube_field);
245        }
246        builder = builder.register(wrapper_obj);
247
248        // Wrapper resolver on Query — stores ChainContext for child resolvers.
249        let default_network = grp.networks.first().cloned().unwrap_or_default();
250        let has_net_arg = grp.has_network_arg;
251        let net_enum_name = grp.network_enum_name.clone()
252            .unwrap_or_else(|| format!("{}Network", grp.name));
253        let wrapper_type_for_resolver = wrapper_type_name.clone();
254        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256        let mut wrapper_field = Field::new(
257            &wrapper_type_name,
258            TypeRef::named_nn(&wrapper_type_name),
259            move |ctx| {
260                let default = default_network.clone();
261                let has_arg = has_net_arg;
262                let arg_names = extra_arg_names.clone();
263                FieldFuture::new(async move {
264                    let network = if has_arg {
265                        ctx.args.try_get("network")
266                            .ok()
267                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268                            .unwrap_or(default)
269                    } else {
270                        default
271                    };
272                    let mut extra = HashMap::new();
273                    for arg_name in &arg_names {
274                        if let Ok(val) = ctx.args.try_get(arg_name) {
275                            let resolved = val.enum_name().ok().map(|s| s.to_string())
276                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277                                .or_else(|| val.string().ok().map(|s| s.to_string()));
278                            if let Some(v) = resolved {
279                                extra.insert(arg_name.clone(), v);
280                            }
281                        }
282                    }
283                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284                })
285            },
286        );
287
288        if grp.has_network_arg {
289            let net_type_ref = if grp.networks.len() > 1 {
290                TypeRef::named_nn(&net_enum_name) // required when multiple networks
291            } else {
292                TypeRef::named(&net_enum_name) // optional when single network (defaults to first)
293            };
294            wrapper_field = wrapper_field.argument(
295                InputValue::new("network", net_type_ref)
296                    .description(format!("{} network to query", wrapper_type_for_resolver)),
297            );
298        }
299        for wa in &grp.extra_args {
300            wrapper_field = wrapper_field.argument(
301                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
302                    .description(&wa.description),
303            );
304        }
305
306        query = query.field(wrapper_field);
307    }
308
309    // --- _cubeMetadata -------------------------------------------------------
310    let metadata_registry = Arc::new(registry.clone());
311    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
312        .map(|g| (g.name.clone(), g.group.clone()))
313        .collect();
314    let metadata_field = Field::new(
315        "_cubeMetadata",
316        TypeRef::named_nn(TypeRef::STRING),
317        move |_ctx| {
318            let reg = metadata_registry.clone();
319            let groups = metadata_groups.clone();
320            FieldFuture::new(async move {
321                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
322                for (group_name, group_enum) in &groups {
323                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
324                        .filter(|c| c.chain_groups.contains(group_enum))
325                        .map(serialize_cube_metadata)
326                        .collect();
327                    group_metadata.push(serde_json::json!({
328                        "group": group_name,
329                        "cubes": cubes_in_group,
330                    }));
331                }
332                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
333                Ok(Some(FieldValue::value(Value::from(json))))
334            })
335        },
336    )
337    .description("Internal: returns JSON metadata about all cubes grouped by chain");
338    query = query.field(metadata_field);
339
340    builder = builder.register(query);
341    builder = builder.data(registry);
342
343    if let Some(transform) = config.table_name_transform.clone() {
344        builder = builder.data(transform);
345    }
346
347    builder.finish()
348}
349
350fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
351    serde_json::json!({
352        "name": cube.name,
353        "description": cube.description,
354        "schema": cube.schema,
355        "tablePattern": cube.table_pattern,
356        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
357        "metrics": cube.metrics.iter().map(|m| {
358            let mut obj = serde_json::json!({
359                "name": m.name,
360                "returnType": format!("{:?}", m.return_type),
361            });
362            if let Some(ref tmpl) = m.expression_template {
363                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
364            }
365            if let Some(ref desc) = m.description {
366                obj["description"] = serde_json::Value::String(desc.clone());
367            }
368            obj
369        }).collect::<Vec<_>>(),
370        "selectors": cube.selectors.iter().map(|s| {
371            serde_json::json!({
372                "name": s.graphql_name,
373                "column": s.column,
374                "type": format!("{:?}", s.dim_type),
375            })
376        }).collect::<Vec<_>>(),
377        "dimensions": serialize_dims(&cube.dimensions),
378        "joins": cube.joins.iter().map(|j| {
379            serde_json::json!({
380                "field": j.field_name,
381                "target": j.target_cube,
382                "joinType": format!("{:?}", j.join_type),
383            })
384        }).collect::<Vec<_>>(),
385        "tableRoutes": cube.table_routes.iter().map(|r| {
386            serde_json::json!({
387                "schema": r.schema,
388                "tablePattern": r.table_pattern,
389                "availableColumns": r.available_columns,
390                "priority": r.priority,
391            })
392        }).collect::<Vec<_>>(),
393        "defaultLimit": cube.default_limit,
394        "maxLimit": cube.max_limit,
395    })
396}
397
398/// Build a single cube Field that reads `network` from the parent ChainContext.
399fn build_cube_field(
400    cube: &CubeDefinition,
401    dialect: Arc<dyn SqlDialect>,
402    executor: QueryExecutor,
403    stats_cb: Option<StatsCallback>,
404) -> Field {
405    let cube_name = cube.name.clone();
406    let orderby_input_name = format!("{}OrderByInput", cube.name);
407    let cube_description = cube.description.clone();
408
409    let mut field = Field::new(
410        &cube.name,
411        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
412        move |ctx| {
413            let cube_name = cube_name.clone();
414            let dialect = dialect.clone();
415            let executor = executor.clone();
416            let stats_cb = stats_cb.clone();
417            FieldFuture::new(async move {
418                let registry = ctx.ctx.data::<CubeRegistry>()?;
419
420                let chain_ctx = ctx.parent_value
421                    .try_downcast_ref::<ChainContext>().ok();
422                let network = chain_ctx
423                    .map(|c| c.network.as_str())
424                    .unwrap_or("sol");
425
426                let cube_def = registry.get(&cube_name).ok_or_else(|| {
427                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
428                })?;
429
430                let metric_requests = extract_metric_requests(&ctx, cube_def);
431                let quantile_requests = extract_quantile_requests(&ctx);
432                let calculate_requests = extract_calculate_requests(&ctx);
433                let field_aliases = extract_field_aliases(&ctx, cube_def);
434                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
435                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
436                let requested = extract_requested_fields(&ctx, cube_def);
437                let mut ir = compiler::parser::parse_cube_query(
438                    cube_def,
439                    network,
440                    &ctx.args,
441                    &metric_requests,
442                    &quantile_requests,
443                    &calculate_requests,
444                    &field_aliases,
445                    &dim_agg_requests,
446                    &time_intervals,
447                    Some(requested),
448                )?;
449
450                let mut join_idx = 0usize;
451                for sub_field in ctx.ctx.field().selection_set() {
452                    let fname = sub_field.name().to_string();
453                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
454                    if let Some(jd) = join_def {
455                        if let Some(target_cube) = registry.get(&jd.target_cube) {
456                            let join_expr = build_join_expr(
457                                jd, target_cube, &sub_field, network, join_idx,
458                            );
459                            ir.joins.push(join_expr);
460                            join_idx += 1;
461                        }
462                    }
463                }
464
465                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
466                    if let Some(cctx) = chain_ctx {
467                        ir.table = transform(&ir.table, cctx);
468                    }
469                }
470
471                let validated = compiler::validator::validate(ir)?;
472                let result = dialect.compile(&validated);
473                let sql = result.sql;
474                let bindings = result.bindings;
475
476                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
477                    async_graphql::Error::new(format!("Query execution failed: {e}"))
478                })?;
479
480                let rows = if result.alias_remap.is_empty() {
481                    rows
482                } else {
483                    rows.into_iter().map(|mut row| {
484                        for (alias, original) in &result.alias_remap {
485                            if let Some(val) = row.shift_remove(alias) {
486                                row.entry(original.clone()).or_insert(val);
487                            }
488                        }
489                        row
490                    }).collect()
491                };
492
493                let rows: Vec<RowMap> = if validated.joins.is_empty() {
494                    rows
495                } else {
496                    rows.into_iter().map(|mut row| {
497                        for join in &validated.joins {
498                            let prefix = format!("{}.", join.alias);
499                            let mut sub_row = RowMap::new();
500                            let keys: Vec<String> = row.keys()
501                                .filter(|k| k.starts_with(&prefix))
502                                .cloned()
503                                .collect();
504                            for key in keys {
505                                if let Some(val) = row.shift_remove(&key) {
506                                    sub_row.insert(key[prefix.len()..].to_string(), val);
507                                }
508                            }
509                            let obj: serde_json::Map<String, serde_json::Value> =
510                                sub_row.into_iter().collect();
511                            row.insert(
512                                join.join_field.clone(),
513                                serde_json::Value::Object(obj),
514                            );
515                        }
516                        row
517                    }).collect()
518                };
519
520                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
521                    .or_else(|| stats_cb.clone());
522                if let Some(cb) = effective_cb {
523                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
524                    cb(stats);
525                }
526
527                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
528                Ok(Some(FieldValue::list(values)))
529            })
530        },
531    );
532    if !cube_description.is_empty() {
533        field = field.description(&cube_description);
534    }
535    field = field
536        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
537            .description("Filter conditions"))
538        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
539            .description("Pagination control"))
540        .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
541            .description("Per-group row limit"))
542        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
543            .description("Sort order (Bitquery-compatible)"));
544
545    for sel in &cube.selectors {
546        let filter_type = dim_type_to_filter_name(&sel.dim_type);
547        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
548            .description(format!("Shorthand filter for {}", sel.graphql_name)));
549    }
550
551    field
552}
553
554fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
555    serde_json::Value::Array(dims.iter().map(|d| match d {
556        DimensionNode::Leaf(dim) => {
557            let mut obj = serde_json::json!({
558                "name": dim.graphql_name,
559                "column": dim.column,
560                "type": format!("{:?}", dim.dim_type),
561            });
562            if let Some(desc) = &dim.description {
563                obj["description"] = serde_json::Value::String(desc.clone());
564            }
565            obj
566        },
567        DimensionNode::Group { graphql_name, description, children } => {
568            let mut obj = serde_json::json!({
569                "name": graphql_name,
570                "children": serialize_dims(children),
571            });
572            if let Some(desc) = description {
573                obj["description"] = serde_json::Value::String(desc.clone());
574            }
575            obj
576        },
577        DimensionNode::Array { graphql_name, description, children } => {
578            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
579                let type_value = match &f.field_type {
580                    crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
581                        "kind": "scalar",
582                        "scalarType": format!("{:?}", dt),
583                    }),
584                    crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
585                        "kind": "union",
586                        "variants": variants.iter().map(|v| serde_json::json!({
587                            "typeName": v.type_name,
588                            "fieldName": v.field_name,
589                            "sourceType": format!("{:?}", v.source_type),
590                        })).collect::<Vec<_>>(),
591                    }),
592                };
593                let mut field_obj = serde_json::json!({
594                    "name": f.graphql_name,
595                    "column": f.column,
596                    "type": type_value,
597                });
598                if let Some(desc) = &f.description {
599                    field_obj["description"] = serde_json::Value::String(desc.clone());
600                }
601                field_obj
602            }).collect();
603            let mut obj = serde_json::json!({
604                "name": graphql_name,
605                "kind": "array",
606                "fields": fields,
607            });
608            if let Some(desc) = description {
609                obj["description"] = serde_json::Value::String(desc.clone());
610            }
611            obj
612        },
613    }).collect())
614}
615
616/// Extract metric requests from the GraphQL selection set by inspecting
617/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
618/// the "count" field in the selection set and extract its `of` argument.
619fn extract_metric_requests(
620    ctx: &async_graphql::dynamic::ResolverContext,
621    cube: &CubeDefinition,
622) -> Vec<compiler::parser::MetricRequest> {
623    let mut requests = Vec::new();
624
625    for sub_field in ctx.ctx.field().selection_set() {
626        let name = sub_field.name();
627        if !cube.has_metric(name) {
628            continue;
629        }
630
631        let args = match sub_field.arguments() {
632            Ok(args) => args,
633            Err(_) => continue,
634        };
635
636        let of_dimension = args
637            .iter()
638            .find(|(k, _)| k.as_str() == "of")
639            .or_else(|| args.iter().find(|(k, _)| k.as_str() == "distinct"))
640            .and_then(|(_, v)| match v {
641                async_graphql::Value::Enum(e) => Some(e.to_string()),
642                async_graphql::Value::String(s) => Some(s.clone()),
643                _ => None,
644            })
645            .unwrap_or_else(|| "*".to_string());
646
647        let select_where_value = args
648            .iter()
649            .find(|(k, _)| k.as_str() == "selectWhere")
650            .map(|(_, v)| v.clone());
651
652        let condition_filter = args
653            .iter()
654            .find(|(k, _)| k.as_str() == "if")
655            .and_then(|(_, v)| {
656                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
657                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
658            });
659
660        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
661        requests.push(compiler::parser::MetricRequest {
662            function: name.to_string(),
663            alias: gql_alias,
664            of_dimension,
665            select_where_value,
666            condition_filter,
667        });
668    }
669
670    requests
671}
672
673fn extract_requested_fields(
674    ctx: &async_graphql::dynamic::ResolverContext,
675    cube: &CubeDefinition,
676) -> HashSet<String> {
677    let mut fields = HashSet::new();
678    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
679    fields
680}
681
682fn collect_selection_paths(
683    field: &async_graphql::SelectionField<'_>,
684    prefix: &str,
685    out: &mut HashSet<String>,
686    metrics: &[MetricDef],
687) {
688    for sub in field.selection_set() {
689        let name = sub.name();
690        if metrics.iter().any(|m| m.name == name) {
691            continue;
692        }
693        let path = if prefix.is_empty() {
694            name.to_string()
695        } else {
696            format!("{prefix}_{name}")
697        };
698        let has_children = sub.selection_set().next().is_some();
699        if has_children {
700            collect_selection_paths(&sub, &path, out, metrics);
701        } else {
702            out.insert(path);
703        }
704    }
705}
706
707/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
708/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
709pub type FieldAliasMap = Vec<(String, String)>;
710
711/// Describes a `quantile(of: ..., level: ...)` request.
712pub struct QuantileRequest {
713    pub alias: String,
714    pub of_dimension: String,
715    pub level: f64,
716}
717
718/// Describes a `calculate(expression: "...")` request.
719pub struct CalculateRequest {
720    pub alias: String,
721    pub expression: String,
722}
723
724/// Describes a time interval bucketing request.
725/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
726pub struct TimeIntervalRequest {
727    pub field_path: String,
728    pub graphql_alias: String,
729    pub column: String,
730    pub unit: String,
731    pub count: i64,
732}
733
734/// Describes a dimension-level aggregation request extracted from the selection set.
735/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
736pub struct DimAggRequest {
737    pub field_path: String,
738    pub graphql_alias: String,
739    pub value_column: String,
740    pub agg_type: DimAggType,
741    pub compare_column: String,
742    pub condition_filter: Option<FilterNode>,
743    pub select_where_value: Option<async_graphql::Value>,
744}
745
746fn extract_quantile_requests(
747    ctx: &async_graphql::dynamic::ResolverContext,
748) -> Vec<QuantileRequest> {
749    let mut requests = Vec::new();
750    for sub_field in ctx.ctx.field().selection_set() {
751        if sub_field.name() != "quantile" { continue; }
752        let args = match sub_field.arguments() {
753            Ok(a) => a,
754            Err(_) => continue,
755        };
756        let of_dim = args.iter()
757            .find(|(k, _)| k.as_str() == "of")
758            .and_then(|(_, v)| match v {
759                async_graphql::Value::Enum(e) => Some(e.to_string()),
760                async_graphql::Value::String(s) => Some(s.clone()),
761                _ => None,
762            })
763            .unwrap_or_else(|| "*".to_string());
764        let level = args.iter()
765            .find(|(k, _)| k.as_str() == "level")
766            .and_then(|(_, v)| match v {
767                async_graphql::Value::Number(n) => n.as_f64(),
768                _ => None,
769            })
770            .unwrap_or(0.5);
771        let alias = sub_field.alias().unwrap_or("quantile").to_string();
772        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
773    }
774    requests
775}
776
777fn extract_field_aliases(
778    ctx: &async_graphql::dynamic::ResolverContext,
779    cube: &CubeDefinition,
780) -> FieldAliasMap {
781    let flat = cube.flat_dimensions();
782    let mut aliases = Vec::new();
783    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
784    aliases
785}
786
787fn collect_field_aliases(
788    field: &async_graphql::SelectionField<'_>,
789    prefix: &str,
790    flat: &[(String, crate::cube::definition::Dimension)],
791    metrics: &[MetricDef],
792    out: &mut FieldAliasMap,
793) {
794    for sub in field.selection_set() {
795        let name = sub.name();
796        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
797            continue;
798        }
799        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
800        let has_children = sub.selection_set().next().is_some();
801        if has_children {
802            collect_field_aliases(&sub, &path, flat, metrics, out);
803        } else if let Some(alias) = sub.alias() {
804            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
805                let alias_path = if prefix.is_empty() {
806                    alias.to_string()
807                } else {
808                    format!("{prefix}_{alias}")
809                };
810                out.push((alias_path, dim.column.clone()));
811            }
812        }
813    }
814}
815
816fn extract_calculate_requests(
817    ctx: &async_graphql::dynamic::ResolverContext,
818) -> Vec<CalculateRequest> {
819    let mut requests = Vec::new();
820    for sub_field in ctx.ctx.field().selection_set() {
821        if sub_field.name() != "calculate" { continue; }
822        let args = match sub_field.arguments() {
823            Ok(a) => a,
824            Err(_) => continue,
825        };
826        let expression = args.iter()
827            .find(|(k, _)| k.as_str() == "expression")
828            .and_then(|(_, v)| match v {
829                async_graphql::Value::String(s) => Some(s.clone()),
830                _ => None,
831            });
832        if let Some(expr) = expression {
833            let alias = sub_field.alias().unwrap_or("calculate").to_string();
834            requests.push(CalculateRequest { alias, expression: expr });
835        }
836    }
837    requests
838}
839
840fn extract_dim_agg_requests(
841    ctx: &async_graphql::dynamic::ResolverContext,
842    cube: &CubeDefinition,
843) -> Vec<DimAggRequest> {
844    let flat = cube.flat_dimensions();
845    let mut requests = Vec::new();
846    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
847    requests
848}
849
850fn collect_dim_agg_paths(
851    field: &async_graphql::SelectionField<'_>,
852    prefix: &str,
853    flat: &[(String, crate::cube::definition::Dimension)],
854    metrics: &[MetricDef],
855    cube: &CubeDefinition,
856    out: &mut Vec<DimAggRequest>,
857) {
858    for sub in field.selection_set() {
859        let name = sub.name();
860        if metrics.iter().any(|m| m.name == name) {
861            continue;
862        }
863        let path = if prefix.is_empty() {
864            name.to_string()
865        } else {
866            format!("{prefix}_{name}")
867        };
868        let has_children = sub.selection_set().next().is_some();
869        if has_children {
870            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
871        } else {
872            let args = match sub.arguments() {
873                Ok(a) => a,
874                Err(_) => continue,
875            };
876            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
877            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
878
879            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
880                let cp = match v {
881                    async_graphql::Value::Enum(e) => e.to_string(),
882                    async_graphql::Value::String(s) => s.clone(),
883                    _ => continue,
884                };
885                (DimAggType::ArgMax, cp)
886            } else if let Some((_, v)) = min_val {
887                let cp = match v {
888                    async_graphql::Value::Enum(e) => e.to_string(),
889                    async_graphql::Value::String(s) => s.clone(),
890                    _ => continue,
891                };
892                (DimAggType::ArgMin, cp)
893            } else {
894                continue;
895            };
896
897            let value_column = flat.iter()
898                .find(|(p, _)| p == &path)
899                .map(|(_, dim)| dim.column.clone());
900            let compare_column = flat.iter()
901                .find(|(p, _)| p == &compare_path)
902                .map(|(_, dim)| dim.column.clone());
903
904            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
905                let condition_filter = args.iter()
906                    .find(|(k, _)| k.as_str() == "if")
907                    .and_then(|(_, v)| {
908                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
909                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
910                    });
911                let select_where_value = args.iter()
912                    .find(|(k, _)| k.as_str() == "selectWhere")
913                    .map(|(_, v)| v.clone());
914
915                let gql_alias = sub.alias()
916                    .map(|a| a.to_string())
917                    .unwrap_or_else(|| path.clone());
918                out.push(DimAggRequest {
919                    field_path: path,
920                    graphql_alias: gql_alias,
921                    value_column: vc,
922                    agg_type,
923                    compare_column: cc,
924                    condition_filter,
925                    select_where_value,
926                });
927            }
928        }
929    }
930}
931
932fn extract_time_interval_requests(
933    ctx: &async_graphql::dynamic::ResolverContext,
934    cube: &CubeDefinition,
935) -> Vec<TimeIntervalRequest> {
936    let flat = cube.flat_dimensions();
937    let mut requests = Vec::new();
938    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
939    requests
940}
941
942fn collect_time_interval_paths(
943    field: &async_graphql::SelectionField<'_>,
944    prefix: &str,
945    flat: &[(String, crate::cube::definition::Dimension)],
946    metrics: &[MetricDef],
947    out: &mut Vec<TimeIntervalRequest>,
948) {
949    for sub in field.selection_set() {
950        let name = sub.name();
951        if metrics.iter().any(|m| m.name == name) { continue; }
952        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
953        let has_children = sub.selection_set().next().is_some();
954        if has_children {
955            collect_time_interval_paths(&sub, &path, flat, metrics, out);
956        } else {
957            let args = match sub.arguments() {
958                Ok(a) => a,
959                Err(_) => continue,
960            };
961            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
962            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
963                let unit = obj.get("in")
964                    .and_then(|v| match v {
965                        async_graphql::Value::Enum(e) => Some(e.to_string()),
966                        async_graphql::Value::String(s) => Some(s.clone()),
967                        _ => None,
968                    });
969                let count = obj.get("count")
970                    .and_then(|v| match v {
971                        async_graphql::Value::Number(n) => n.as_i64(),
972                        _ => None,
973                    });
974                if let (Some(unit), Some(count)) = (unit, count) {
975                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
976                        let gql_alias = sub.alias().unwrap_or(name).to_string();
977                        out.push(TimeIntervalRequest {
978                            field_path: path,
979                            graphql_alias: gql_alias,
980                            column: dim.column.clone(),
981                            unit,
982                            count,
983                        });
984                    }
985                }
986            }
987        }
988    }
989}
990
991// ---------------------------------------------------------------------------
992// Per-Cube GraphQL type generation
993// ---------------------------------------------------------------------------
994
995struct CubeTypes {
996    objects: Vec<Object>,
997    inputs: Vec<InputObject>,
998    enums: Vec<Enum>,
999    unions: Vec<Union>,
1000}
1001
1002fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
1003    let record_name = format!("{}Record", cube.name);
1004    let filter_name = format!("{}Filter", cube.name);
1005    let compare_enum_name = format!("{}CompareFields", cube.name);
1006
1007    let flat_dims = cube.flat_dimensions();
1008
1009    let mut compare_enum = Enum::new(&compare_enum_name)
1010        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1011    for (path, _) in &flat_dims {
1012        compare_enum = compare_enum.item(EnumItem::new(path));
1013    }
1014
1015    let mut record_fields: Vec<Field> = Vec::new();
1016    let mut filter_fields: Vec<InputValue> = Vec::new();
1017    let mut extra_objects: Vec<Object> = Vec::new();
1018    let mut extra_inputs: Vec<InputObject> = Vec::new();
1019    let mut extra_unions: Vec<Union> = Vec::new();
1020
1021    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1022        .description("OR combinator — matches if any sub-filter matches"));
1023
1024    {
1025        let mut collector = DimCollector {
1026            cube_name: &cube.name,
1027            compare_enum_name: &compare_enum_name,
1028            filter_name: &filter_name,
1029            record_fields: &mut record_fields,
1030            filter_fields: &mut filter_fields,
1031            extra_objects: &mut extra_objects,
1032            extra_inputs: &mut extra_inputs,
1033            extra_unions: &mut extra_unions,
1034        };
1035        for node in &cube.dimensions {
1036            collect_dimension_types(node, "", &mut collector);
1037        }
1038    }
1039
1040    let mut metric_enums: Vec<Enum> = Vec::new();
1041    let builtin_descs: std::collections::HashMap<&str, &str> = [
1042        ("count", "Count of rows or distinct values"),
1043        ("sum", "Sum of values"),
1044        ("avg", "Average of values"),
1045        ("min", "Minimum value"),
1046        ("max", "Maximum value"),
1047        ("uniq", "Count of unique (distinct) values"),
1048    ].into_iter().collect();
1049
1050    for metric in &cube.metrics {
1051        let metric_name = &metric.name;
1052        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1053
1054        if metric.supports_where {
1055            extra_inputs.push(
1056                InputObject::new(&select_where_name)
1057                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1058                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1059                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1060                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1061                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1062                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
1063                    .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
1064            );
1065        }
1066
1067        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1068        let mut of_enum = Enum::new(&of_enum_name)
1069            .description(format!("Dimension to apply {} aggregation on", metric_name));
1070        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1071        metric_enums.push(of_enum);
1072
1073        let metric_name_clone = metric_name.clone();
1074        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1075        let metric_desc = metric.description.as_deref()
1076            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1077            .unwrap_or("Aggregate metric");
1078
1079        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1080            let default_name = metric_name_clone.clone();
1081            FieldFuture::new(async move {
1082                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1083                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1084                let key = metric_key(alias);
1085                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1086                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1087            })
1088        })
1089        .description(metric_desc)
1090        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1091            .description("Dimension to aggregate on (default: all rows)"));
1092
1093        if metric_name == "count" {
1094            metric_field = metric_field
1095                .argument(InputValue::new("distinct", TypeRef::named(&of_enum_name))
1096                    .description("Count distinct values of this dimension (alias for of, maps to uniqExact)"));
1097        }
1098
1099        if metric.supports_where {
1100            metric_field = metric_field
1101                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1102                    .description("Post-aggregation filter (HAVING)"))
1103                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1104                    .description("Conditional filter for this metric"));
1105        }
1106
1107        record_fields.push(metric_field);
1108    }
1109
1110    // `quantile(of: ..., level: ...)` — percentile computation
1111    {
1112        let of_enum_name = format!("{}_quantile_Of", cube.name);
1113        let mut of_enum = Enum::new(&of_enum_name)
1114            .description(format!("Dimension to apply quantile on for {}", cube.name));
1115        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1116        metric_enums.push(of_enum);
1117
1118        let of_enum_for_closure = of_enum_name.clone();
1119        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1120            FieldFuture::new(async move {
1121                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1122                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1123                let key = metric_key(alias);
1124                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1125                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1126            })
1127        })
1128        .description("Compute a quantile (percentile) of a dimension")
1129        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1130            .description("Dimension to compute quantile on"))
1131        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1132            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1133        record_fields.push(quantile_field);
1134    }
1135
1136    // `calculate(expression: "...")` — runtime expression computation
1137    {
1138        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1139            FieldFuture::new(async move {
1140                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1141                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1142                let key = metric_key(alias);
1143                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1144                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1145            })
1146        })
1147        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1148        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1149            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1150        record_fields.push(calculate_field);
1151    }
1152
1153    // Add join fields: joinXxx returns the target cube's Record type
1154    for jd in &cube.joins {
1155        let target_record_name = format!("{}Record", jd.target_cube);
1156        let field_name_owned = jd.field_name.clone();
1157        let mut join_field = Field::new(
1158            &jd.field_name,
1159            TypeRef::named(&target_record_name),
1160            move |ctx| {
1161                let field_name = field_name_owned.clone();
1162                FieldFuture::new(async move {
1163                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1164                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1165                        let sub_row: RowMap = obj.iter()
1166                            .map(|(k, v)| (k.clone(), v.clone()))
1167                            .collect();
1168                        Ok(Some(FieldValue::owned_any(sub_row)))
1169                    } else {
1170                        Ok(Some(FieldValue::value(Value::Null)))
1171                    }
1172                })
1173            },
1174        );
1175        if let Some(desc) = &jd.description {
1176            join_field = join_field.description(desc);
1177        }
1178        record_fields.push(join_field);
1179    }
1180
1181    let mut record = Object::new(&record_name);
1182    for f in record_fields { record = record.field(f); }
1183
1184    let mut filter = InputObject::new(&filter_name)
1185        .description(format!("Filter conditions for {} query", cube.name));
1186    for f in filter_fields { filter = filter.field(f); }
1187
1188    let orderby_input_name = format!("{}OrderByInput", cube.name);
1189    let orderby_input = InputObject::new(&orderby_input_name)
1190        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1191        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1192            .description("Sort descending by this field"))
1193        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1194            .description("Sort ascending by this field"))
1195        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1196            .description("Sort descending by computed/aggregated field name"))
1197        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1198            .description("Sort ascending by computed/aggregated field name"));
1199
1200    let limitby_input_name = format!("{}LimitByInput", cube.name);
1201    let limitby_input = InputObject::new(&limitby_input_name)
1202        .description(format!("Per-group row limit for {}", cube.name))
1203        .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1204            .description("Dimension field to group by"))
1205        .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1206            .description("Maximum rows per group"))
1207        .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1208            .description("Rows to skip per group"));
1209
1210    let mut objects = vec![record]; objects.extend(extra_objects);
1211    let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1212    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1213
1214    CubeTypes { objects, inputs, enums, unions: extra_unions }
1215}
1216
1217struct DimCollector<'a> {
1218    cube_name: &'a str,
1219    compare_enum_name: &'a str,
1220    filter_name: &'a str,
1221    record_fields: &'a mut Vec<Field>,
1222    filter_fields: &'a mut Vec<InputValue>,
1223    extra_objects: &'a mut Vec<Object>,
1224    extra_inputs: &'a mut Vec<InputObject>,
1225    extra_unions: &'a mut Vec<Union>,
1226}
1227
1228fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1229    match node {
1230        DimensionNode::Leaf(dim) => {
1231            let col = dim.column.clone();
1232            let is_datetime = dim.dim_type == DimType::DateTime;
1233            let compare_enum = c.compare_enum_name.to_string();
1234            let cube_filter = c.filter_name.to_string();
1235            let mut leaf_field = Field::new(
1236                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1237                move |ctx| {
1238                    let col = col.clone();
1239                    FieldFuture::new(async move {
1240                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1241                        let has_interval = ctx.args.try_get("interval").is_ok();
1242                        let has_max = ctx.args.try_get("maximum").is_ok();
1243                        let has_min = ctx.args.try_get("minimum").is_ok();
1244                        let key = if has_interval || has_max || has_min {
1245                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1246                            dim_agg_key(name)
1247                        } else {
1248                            col.clone()
1249                        };
1250                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1251                        let gql_val = if is_datetime {
1252                            json_to_gql_datetime(val)
1253                        } else {
1254                            json_to_gql_value(val)
1255                        };
1256                        Ok(Some(FieldValue::value(gql_val)))
1257                    })
1258                },
1259            );
1260            if let Some(desc) = &dim.description {
1261                leaf_field = leaf_field.description(desc);
1262            }
1263            leaf_field = leaf_field
1264                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1265                    .description("Return value from row where compare field is maximum (argMax)"))
1266                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1267                    .description("Return value from row where compare field is minimum (argMin)"))
1268                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1269                    .description("Conditional filter for aggregation"))
1270                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1271                    .description("Post-aggregation value filter (HAVING)"));
1272            if is_datetime {
1273                leaf_field = leaf_field
1274                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1275                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1276            }
1277            // Update resolver key: if interval is present, read from the interval expression key
1278            // This is handled by the resolver checking ctx.args for "interval"
1279            c.record_fields.push(leaf_field);
1280            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1281        }
1282        DimensionNode::Group { graphql_name, description, children } => {
1283            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1284            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1285            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1286
1287            let mut child_record_fields: Vec<Field> = Vec::new();
1288            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1289            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1290
1291            let mut child_collector = DimCollector {
1292                cube_name: c.cube_name,
1293                compare_enum_name: c.compare_enum_name,
1294                filter_name: c.filter_name,
1295                record_fields: &mut child_record_fields,
1296                filter_fields: &mut child_filter_fields,
1297                extra_objects: c.extra_objects,
1298                extra_inputs: c.extra_inputs,
1299                extra_unions: c.extra_unions,
1300            };
1301            for child in children {
1302                collect_dimension_types(child, &new_prefix, &mut child_collector);
1303            }
1304
1305            let mut nested_record = Object::new(&nested_record_name);
1306            for f in child_record_fields { nested_record = nested_record.field(f); }
1307
1308            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1309            let mut nested_filter = InputObject::new(&nested_filter_name)
1310                .description(nested_filter_desc);
1311            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1312
1313            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1314                FieldFuture::new(async move {
1315                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1316                    Ok(Some(FieldValue::owned_any(row.clone())))
1317                })
1318            });
1319            if let Some(desc) = description {
1320                group_field = group_field.description(desc);
1321            }
1322            c.record_fields.push(group_field);
1323            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1324            c.extra_objects.push(nested_record);
1325            c.extra_inputs.push(nested_filter);
1326        }
1327        DimensionNode::Array { graphql_name, description, children } => {
1328            let full_path = if prefix.is_empty() {
1329                graphql_name.clone()
1330            } else {
1331                format!("{prefix}_{graphql_name}")
1332            };
1333            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1334            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1335
1336            let mut element_obj = Object::new(&element_type_name);
1337            let mut includes_filter = InputObject::new(&includes_filter_name)
1338                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1339
1340            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1341
1342            for child in children {
1343                match &child.field_type {
1344                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1345                        let col_name = child.column.clone();
1346                        let is_datetime = *dt == DimType::DateTime;
1347                        let mut field = Field::new(
1348                            &child.graphql_name,
1349                            dim_type_to_typeref(dt),
1350                            move |ctx| {
1351                                let col = col_name.clone();
1352                                FieldFuture::new(async move {
1353                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1354                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1355                                    let gql_val = if is_datetime {
1356                                        json_to_gql_datetime(val)
1357                                    } else {
1358                                        json_to_gql_value(val)
1359                                    };
1360                                    Ok(Some(FieldValue::value(gql_val)))
1361                                })
1362                            },
1363                        );
1364                        if let Some(desc) = &child.description {
1365                            field = field.description(desc);
1366                        }
1367                        element_obj = element_obj.field(field);
1368                        includes_filter = includes_filter.field(
1369                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1370                        );
1371                    }
1372                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1373                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1374
1375                        let mut gql_union = Union::new(&union_name);
1376                        let mut variant_objects = Vec::new();
1377
1378                        for v in variants {
1379                            let variant_obj_name = v.type_name.clone();
1380                            let field_name = v.field_name.clone();
1381                            let source_type = v.source_type.clone();
1382                            let type_ref = dim_type_to_typeref(&source_type);
1383
1384                            let val_col = child.column.clone();
1385                            let variant_obj = Object::new(&variant_obj_name)
1386                                .field(Field::new(
1387                                    &field_name,
1388                                    type_ref,
1389                                    move |ctx| {
1390                                        let col = val_col.clone();
1391                                        FieldFuture::new(async move {
1392                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1393                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1394                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1395                                        })
1396                                    },
1397                                ));
1398                            gql_union = gql_union.possible_type(&variant_obj_name);
1399                            variant_objects.push(variant_obj);
1400                        }
1401
1402                        let col_name = child.column.clone();
1403                        let type_col = children.iter()
1404                            .find(|f| f.graphql_name == "Type")
1405                            .map(|f| f.column.clone())
1406                            .unwrap_or_default();
1407                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1408
1409                        let mut field = Field::new(
1410                            &child.graphql_name,
1411                            TypeRef::named(&union_name),
1412                            move |ctx| {
1413                                let col = col_name.clone();
1414                                let tcol = type_col.clone();
1415                                let vars = variants_clone.clone();
1416                                FieldFuture::new(async move {
1417                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1418                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1419                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1420                                        return Ok(None);
1421                                    }
1422                                    let type_str = row.get(&tcol)
1423                                        .and_then(|v| v.as_str())
1424                                        .unwrap_or("");
1425                                    let resolved_type = resolve_union_typename(type_str, &vars);
1426                                    let mut elem = RowMap::new();
1427                                    elem.insert(col.clone(), raw_val);
1428                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1429                                })
1430                            },
1431                        );
1432                        if let Some(desc) = &child.description {
1433                            field = field.description(desc);
1434                        }
1435                        element_obj = element_obj.field(field);
1436
1437                        includes_filter = includes_filter.field(
1438                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1439                        );
1440
1441                        union_registrations.push((union_name, gql_union, variant_objects));
1442                    }
1443                }
1444            }
1445
1446            // Register union types and variant objects
1447            for (_, union_type, variant_objs) in union_registrations {
1448                c.extra_objects.extend(variant_objs);
1449                // Unions need to be registered differently — store in extra_objects
1450                // by wrapping in a dummy. Actually, we need to register unions via builder.
1451                // For now, we'll store them in a new field. Let's use the enums vec hack:
1452                // Actually, async-graphql dynamic schema registers unions the same way.
1453                // We need to pass them up. Let's add a unions vec to DimCollector.
1454                // For now, store union as Object (it won't work). We need to refactor.
1455                // Let me add unions to CubeTypes.
1456                c.extra_unions.push(union_type);
1457            }
1458
1459            // Build the list resolver: zip parallel array columns into element objects
1460            let child_columns: Vec<(String, String)> = children.iter()
1461                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1462                .collect();
1463            let element_type_name_clone = element_type_name.clone();
1464
1465            let mut array_field = Field::new(
1466                graphql_name,
1467                TypeRef::named_nn_list_nn(&element_type_name),
1468                move |ctx| {
1469                    let cols = child_columns.clone();
1470                    let _etype = element_type_name_clone.clone();
1471                    FieldFuture::new(async move {
1472                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1473                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1474                            .map(|(gql_name, col)| {
1475                                let arr = row.get(col)
1476                                    .and_then(|v| v.as_array())
1477                                    .cloned()
1478                                    .unwrap_or_default();
1479                                (gql_name.as_str(), arr)
1480                            })
1481                            .collect();
1482
1483                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1484                        let mut elements = Vec::with_capacity(len);
1485                        for i in 0..len {
1486                            let mut elem = RowMap::new();
1487                            for (gql_name, arr) in &arrays {
1488                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1489                                elem.insert(gql_name.to_string(), val);
1490                            }
1491                            // Also store by column name for Union resolvers
1492                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1493                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1494                                elem.insert(col.clone(), val);
1495                            }
1496                            elements.push(FieldValue::owned_any(elem));
1497                        }
1498                        Ok(Some(FieldValue::list(elements)))
1499                    })
1500                },
1501            );
1502            if let Some(desc) = description {
1503                array_field = array_field.description(desc);
1504            }
1505            c.record_fields.push(array_field);
1506
1507            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1508            // parser can find the `includes` key (compiler/filter.rs expects
1509            // `{ includes: [{ Name: { is: "..." } }] }`).
1510            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1511            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1512                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1513                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1514                    .description("Match rows where at least one array element satisfies all conditions"));
1515            c.extra_inputs.push(wrapper_filter);
1516
1517            c.filter_fields.push(InputValue::new(
1518                graphql_name,
1519                TypeRef::named(&wrapper_filter_name),
1520            ));
1521
1522            c.extra_objects.push(element_obj);
1523            c.extra_inputs.push(includes_filter);
1524        }
1525    }
1526}
1527
1528/// Resolve the GraphQL Union __typename from a discriminator column value.
1529///
1530/// Walks variants in order; the first whose `source_type_names` contains
1531/// `type_str` wins.  If none match, the last variant is used as fallback
1532/// (conventionally the "catch-all" variant such as JSON).
1533fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1534    for v in variants {
1535        if v.source_type_names.iter().any(|s| s == type_str) {
1536            return v.type_name.clone();
1537        }
1538    }
1539    variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1540}
1541
1542fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1543    match dt {
1544        DimType::String | DimType::Decimal | DimType::BigInteger | DimType::Date => TypeRef::named(TypeRef::STRING),
1545        DimType::DateTime => TypeRef::named("DateTime"),
1546        DimType::Int => TypeRef::named(TypeRef::INT),
1547        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1548        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1549    }
1550}
1551
1552fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1553    match dt {
1554        DimType::String => "StringFilter",
1555        DimType::Int => "IntFilter",
1556        DimType::Float => "FloatFilter",
1557        DimType::Decimal => "DecimalFilter",
1558        DimType::BigInteger => "BigIntFilter",
1559        DimType::Date => "DateFilter",
1560        DimType::DateTime => "DateTimeFilter",
1561        DimType::Bool => "BoolFilter",
1562    }
1563}
1564
1565pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1566    match v {
1567        serde_json::Value::Null => Value::Null,
1568        serde_json::Value::Bool(b) => Value::from(b),
1569        serde_json::Value::Number(n) => {
1570            if let Some(i) = n.as_i64() { Value::from(i) }
1571            else if let Some(f) = n.as_f64() { Value::from(f) }
1572            else { Value::from(n.to_string()) }
1573        }
1574        serde_json::Value::String(s) => Value::from(s),
1575        _ => Value::from(v.to_string()),
1576    }
1577}
1578
1579/// Build a JoinExpr from a JoinDef and target cube definition.
1580/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1581fn build_join_expr(
1582    jd: &crate::cube::definition::JoinDef,
1583    target_cube: &CubeDefinition,
1584    sub_field: &async_graphql::SelectionField<'_>,
1585    network: &str,
1586    join_idx: usize,
1587) -> JoinExpr {
1588    let target_flat = target_cube.flat_dimensions();
1589    let target_table = target_cube.table_for_chain(network);
1590
1591    let mut requested_paths = HashSet::new();
1592    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1593
1594    let mut selects: Vec<SelectExpr> = target_flat.iter()
1595        .filter(|(path, _)| requested_paths.contains(path))
1596        .map(|(_, dim)| SelectExpr::Column {
1597            column: dim.column.clone(),
1598            alias: None,
1599        })
1600        .collect();
1601
1602    if selects.is_empty() {
1603        selects = target_flat.iter()
1604            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1605            .collect();
1606    }
1607
1608    let is_aggregate = target_flat.iter().any(|(_, dim)| is_aggregate_expr(&dim.column));
1609
1610    let group_by = if is_aggregate {
1611        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1612        for sel in &selects {
1613            if let SelectExpr::Column { column, .. } = sel {
1614                if !is_aggregate_expr(column) && !gb.contains(column) {
1615                    gb.push(column.clone());
1616                }
1617            }
1618        }
1619        gb
1620    } else {
1621        vec![]
1622    };
1623
1624    JoinExpr {
1625        schema: target_cube.schema.clone(),
1626        table: target_table,
1627        alias: format!("_j{}", join_idx),
1628        conditions: jd.conditions.clone(),
1629        selects,
1630        group_by,
1631        use_final: target_cube.use_final,
1632        is_aggregate,
1633        target_cube: jd.target_cube.clone(),
1634        join_field: sub_field.name().to_string(),
1635        join_type: jd.join_type.clone(),
1636    }
1637}
1638
1639/// Convert a ClickHouse DateTime value to ISO 8601 format.
1640/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1641fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1642    match v {
1643        serde_json::Value::String(s) => {
1644            let iso = if s.contains('T') {
1645                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1646            } else {
1647                let replaced = s.replacen(' ', "T", 1);
1648                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1649            };
1650            Value::from(iso)
1651        }
1652        other => json_to_gql_value(other),
1653    }
1654}