Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{DimAggType, FilterNode, SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{ChainGroup, CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Canonical key naming for metric SQL aliases. Used by parser and resolver.
16pub fn metric_key(alias: &str) -> String { format!("__{alias}") }
17
18/// Canonical key naming for dimension aggregate / time interval SQL aliases.
19pub fn dim_agg_key(alias: &str) -> String { format!("__da_{alias}") }
20
21/// Async function type that executes a compiled SQL query and returns rows.
22/// The service layer provides this — the library never touches a database directly.
23pub type QueryExecutor = Arc<
24    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
25        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
26    > + Send + Sync,
27>;
28
29/// A custom argument on the chain wrapper field (e.g. dataset, aggregates).
30/// The library registers it on the GraphQL wrapper type and stores the resolved
31/// value in `ChainContext.extra` for child cube resolvers to read.
32#[derive(Debug, Clone)]
33pub struct WrapperArg {
34    /// GraphQL argument name (e.g. "dataset").
35    pub name: String,
36    /// Name of a type already registered in the schema (e.g. "Dataset").
37    pub type_ref: String,
38    /// Description shown in schema introspection.
39    pub description: String,
40    /// Enum values, used by builder schema generation. None for non-enum types.
41    pub values: Option<Vec<String>>,
42}
43
44/// Describes one top-level chain wrapper type (e.g. EVM, Solana, Trading).
45#[derive(Debug, Clone)]
46pub struct ChainGroupConfig {
47    /// Wrapper type name in the schema, e.g. "EVM", "Solana", "Trading".
48    pub name: String,
49    /// Which ChainGroup enum value this config represents.
50    pub group: ChainGroup,
51    /// Available networks. EVM: ["eth","bsc"]; Solana: ["sol"]; Trading: all chains.
52    pub networks: Vec<String>,
53    /// If true the wrapper type takes a `network` argument (EVM style).
54    /// If false the chain is implicit (Solana style) or cross-chain (Trading).
55    pub has_network_arg: bool,
56    /// Additional arguments on the wrapper field, passed through to child resolvers
57    /// via `ChainContext.extra`.
58    pub extra_args: Vec<WrapperArg>,
59    /// Custom name for the network enum type registered in the schema.
60    /// Defaults to `"{name}Network"` (e.g. `EVMNetwork`) when `None`.
61    /// Set to e.g. `Some("evm_network".into())` for Bitquery-compatible naming.
62    pub network_enum_name: Option<String>,
63}
64
65/// Callback to transform the resolved table name based on `ChainContext`.
66/// Called after `resolve_table` picks the base table, before SQL compilation.
67/// Arguments: `(table_name, chain_context) -> transformed_table_name`.
68pub type TableNameTransform = Arc<dyn Fn(&str, &ChainContext) -> String + Send + Sync>;
69
70/// Configuration for supported networks (chains) and optional stats collection.
71pub struct SchemaConfig {
72    pub chain_groups: Vec<ChainGroupConfig>,
73    pub root_query_name: String,
74    /// Optional callback invoked after each cube query with execution metadata.
75    /// Used by application layer for billing, observability, etc.
76    pub stats_callback: Option<StatsCallback>,
77    /// Optional hook to transform table names based on chain context (e.g. dataset suffix).
78    pub table_name_transform: Option<TableNameTransform>,
79    /// Additional global enum types to register in the schema (e.g. Dataset, Aggregates).
80    pub extra_types: Vec<Enum>,
81}
82
83impl Default for SchemaConfig {
84    fn default() -> Self {
85        Self {
86            chain_groups: vec![
87                ChainGroupConfig {
88                    name: "EVM".to_string(),
89                    group: ChainGroup::Evm,
90                    networks: vec!["eth".into(), "bsc".into()],
91                    has_network_arg: true,
92                    extra_args: vec![],
93                    network_enum_name: None,
94                },
95                ChainGroupConfig {
96                    name: "Solana".to_string(),
97                    group: ChainGroup::Solana,
98                    networks: vec!["sol".into()],
99                    has_network_arg: false,
100                    extra_args: vec![],
101                    network_enum_name: None,
102                },
103                ChainGroupConfig {
104                    name: "Trading".to_string(),
105                    group: ChainGroup::Trading,
106                    networks: vec!["sol".into(), "eth".into(), "bsc".into()],
107                    has_network_arg: false,
108                    extra_args: vec![],
109                    network_enum_name: None,
110                },
111            ],
112            root_query_name: "ChainStream".to_string(),
113            stats_callback: None,
114            table_name_transform: None,
115            extra_types: vec![],
116        }
117    }
118}
119
120/// Stored in the parent value of chain wrapper types so cube resolvers can
121/// retrieve the active chain without a `network` argument on themselves.
122/// Service-layer extra_args values are available in the `extra` map.
123pub struct ChainContext {
124    pub network: String,
125    pub extra: HashMap<String, String>,
126}
127
128/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
129///
130/// Schema shape (Bitquery-aligned):
131/// ```graphql
132/// type Query {
133///   EVM(network: EVMNetwork!): EVM!
134///   Solana: Solana!
135///   Trading: Trading!
136///   _cubeMetadata: String!
137/// }
138/// ```
139pub fn build_schema(
140    registry: CubeRegistry,
141    dialect: Arc<dyn SqlDialect>,
142    executor: QueryExecutor,
143    config: SchemaConfig,
144) -> Result<Schema, SchemaError> {
145    let mut builder = Schema::build("Query", None, None);
146
147    // --- shared input / enum types -------------------------------------------
148    builder = builder.register(filter_types::build_limit_input());
149    // LimitByInput is now generated per-cube in build_cube_types() with typed enum for `by`
150    builder = builder.register(
151        Enum::new("OrderDirection")
152            .description("Sort direction")
153            .item(EnumItem::new("ASC").description("Ascending"))
154            .item(EnumItem::new("DESC").description("Descending")),
155    );
156    for input in filter_types::build_filter_primitives() {
157        builder = builder.register(input);
158    }
159    builder = builder.register(
160        Scalar::new("DateTime")
161            .description("ISO 8601 date-time string (e.g. \"2026-04-01T00:00:00Z\")"),
162    );
163    builder = builder.register(
164        Scalar::new("ISO8601DateTime")
165            .description("ISO 8601 date-time string (alias for DateTime, V1 compatibility)"),
166    );
167    builder = builder.register(
168        Enum::new("TimeUnit")
169            .description("Time unit for interval bucketing")
170            .item(EnumItem::new("seconds"))
171            .item(EnumItem::new("minutes"))
172            .item(EnumItem::new("hours"))
173            .item(EnumItem::new("days"))
174            .item(EnumItem::new("weeks"))
175            .item(EnumItem::new("months")),
176    );
177    builder = builder.register(
178        InputObject::new("TimeIntervalInput")
179            .description("Time bucketing interval for DateTime dimensions")
180            .field(InputValue::new("in", TypeRef::named_nn("TimeUnit")).description("Time unit"))
181            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)).description("Number of time units")),
182    );
183    builder = builder.register(
184        InputObject::new("DimSelectWhere")
185            .description("Post-aggregation filter for dimension values (HAVING clause)")
186            .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
187            .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal"))
188            .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
189            .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal"))
190            .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to"))
191            .field(InputValue::new("ne", TypeRef::named(TypeRef::STRING)).description("Not equal to")),
192    );
193
194    for extra_enum in config.extra_types {
195        builder = builder.register(extra_enum);
196    }
197
198    // --- per-group network enums ---------------------------------------------
199    for grp in &config.chain_groups {
200        if grp.has_network_arg {
201            let enum_name = grp.network_enum_name.clone()
202                .unwrap_or_else(|| format!("{}Network", grp.name));
203            let mut net_enum = Enum::new(&enum_name)
204                .description(format!("{} network selector", grp.name));
205            for net in &grp.networks {
206                net_enum = net_enum.item(EnumItem::new(net));
207            }
208            builder = builder.register(net_enum);
209        }
210    }
211
212    // --- register cube types once (shared across chain groups) ----------------
213    let mut registered_cubes: HashSet<String> = HashSet::new();
214    for cube in registry.cubes() {
215        if registered_cubes.contains(&cube.name) { continue; }
216        registered_cubes.insert(cube.name.clone());
217
218        let types = build_cube_types(cube);
219        for obj in types.objects { builder = builder.register(obj); }
220        for inp in types.inputs { builder = builder.register(inp); }
221        for en in types.enums { builder = builder.register(en); }
222        for un in types.unions { builder = builder.register(un); }
223    }
224
225    // --- build chain wrapper types + Query fields ----------------------------
226    let mut query = Object::new("Query");
227
228    for grp in &config.chain_groups {
229        let wrapper_type_name = grp.name.clone();
230
231        // Build the wrapper Object (e.g. "EVM", "Solana", "Trading") and attach
232        // cube fields that belong to this group.
233        let mut wrapper_obj = Object::new(&wrapper_type_name);
234
235        for cube in registry.cubes() {
236            if !cube.chain_groups.contains(&grp.group) { continue; }
237
238            let cube_field = build_cube_field(
239                cube,
240                dialect.clone(),
241                executor.clone(),
242                config.stats_callback.clone(),
243            );
244            wrapper_obj = wrapper_obj.field(cube_field);
245        }
246        builder = builder.register(wrapper_obj);
247
248        // Wrapper resolver on Query — stores ChainContext for child resolvers.
249        let default_network = grp.networks.first().cloned().unwrap_or_default();
250        let has_net_arg = grp.has_network_arg;
251        let net_enum_name = grp.network_enum_name.clone()
252            .unwrap_or_else(|| format!("{}Network", grp.name));
253        let wrapper_type_for_resolver = wrapper_type_name.clone();
254        let extra_arg_names: Vec<String> = grp.extra_args.iter().map(|a| a.name.clone()).collect();
255
256        let mut wrapper_field = Field::new(
257            &wrapper_type_name,
258            TypeRef::named_nn(&wrapper_type_name),
259            move |ctx| {
260                let default = default_network.clone();
261                let has_arg = has_net_arg;
262                let arg_names = extra_arg_names.clone();
263                FieldFuture::new(async move {
264                    let network = if has_arg {
265                        ctx.args.try_get("network")
266                            .ok()
267                            .and_then(|v| v.enum_name().ok().map(|s| s.to_string()))
268                            .unwrap_or(default)
269                    } else {
270                        default
271                    };
272                    let mut extra = HashMap::new();
273                    for arg_name in &arg_names {
274                        if let Ok(val) = ctx.args.try_get(arg_name) {
275                            let resolved = val.enum_name().ok().map(|s| s.to_string())
276                                .or_else(|| val.boolean().ok().map(|b| b.to_string()))
277                                .or_else(|| val.string().ok().map(|s| s.to_string()));
278                            if let Some(v) = resolved {
279                                extra.insert(arg_name.clone(), v);
280                            }
281                        }
282                    }
283                    Ok(Some(FieldValue::owned_any(ChainContext { network, extra })))
284                })
285            },
286        );
287
288        if grp.has_network_arg {
289            wrapper_field = wrapper_field.argument(
290                InputValue::new("network", TypeRef::named_nn(&net_enum_name))
291                    .description(format!("{} network to query", wrapper_type_for_resolver)),
292            );
293        }
294        for wa in &grp.extra_args {
295            wrapper_field = wrapper_field.argument(
296                InputValue::new(&wa.name, TypeRef::named(&wa.type_ref))
297                    .description(&wa.description),
298            );
299        }
300
301        query = query.field(wrapper_field);
302    }
303
304    // --- _cubeMetadata -------------------------------------------------------
305    let metadata_registry = Arc::new(registry.clone());
306    let metadata_groups: Vec<(String, ChainGroup)> = config.chain_groups.iter()
307        .map(|g| (g.name.clone(), g.group.clone()))
308        .collect();
309    let metadata_field = Field::new(
310        "_cubeMetadata",
311        TypeRef::named_nn(TypeRef::STRING),
312        move |_ctx| {
313            let reg = metadata_registry.clone();
314            let groups = metadata_groups.clone();
315            FieldFuture::new(async move {
316                let mut group_metadata: Vec<serde_json::Value> = Vec::new();
317                for (group_name, group_enum) in &groups {
318                    let cubes_in_group: Vec<serde_json::Value> = reg.cubes()
319                        .filter(|c| c.chain_groups.contains(group_enum))
320                        .map(serialize_cube_metadata)
321                        .collect();
322                    group_metadata.push(serde_json::json!({
323                        "group": group_name,
324                        "cubes": cubes_in_group,
325                    }));
326                }
327                let json = serde_json::to_string(&group_metadata).unwrap_or_default();
328                Ok(Some(FieldValue::value(Value::from(json))))
329            })
330        },
331    )
332    .description("Internal: returns JSON metadata about all cubes grouped by chain");
333    query = query.field(metadata_field);
334
335    builder = builder.register(query);
336    builder = builder.data(registry);
337
338    if let Some(transform) = config.table_name_transform.clone() {
339        builder = builder.data(transform);
340    }
341
342    builder.finish()
343}
344
345fn serialize_cube_metadata(cube: &CubeDefinition) -> serde_json::Value {
346    serde_json::json!({
347        "name": cube.name,
348        "description": cube.description,
349        "schema": cube.schema,
350        "tablePattern": cube.table_pattern,
351        "chainGroups": cube.chain_groups.iter().map(|g| format!("{:?}", g)).collect::<Vec<_>>(),
352        "metrics": cube.metrics.iter().map(|m| {
353            let mut obj = serde_json::json!({
354                "name": m.name,
355                "returnType": format!("{:?}", m.return_type),
356            });
357            if let Some(ref tmpl) = m.expression_template {
358                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
359            }
360            if let Some(ref desc) = m.description {
361                obj["description"] = serde_json::Value::String(desc.clone());
362            }
363            obj
364        }).collect::<Vec<_>>(),
365        "selectors": cube.selectors.iter().map(|s| {
366            serde_json::json!({
367                "name": s.graphql_name,
368                "column": s.column,
369                "type": format!("{:?}", s.dim_type),
370            })
371        }).collect::<Vec<_>>(),
372        "dimensions": serialize_dims(&cube.dimensions),
373        "joins": cube.joins.iter().map(|j| {
374            serde_json::json!({
375                "field": j.field_name,
376                "target": j.target_cube,
377                "joinType": format!("{:?}", j.join_type),
378            })
379        }).collect::<Vec<_>>(),
380        "tableRoutes": cube.table_routes.iter().map(|r| {
381            serde_json::json!({
382                "schema": r.schema,
383                "tablePattern": r.table_pattern,
384                "availableColumns": r.available_columns,
385                "priority": r.priority,
386            })
387        }).collect::<Vec<_>>(),
388        "defaultLimit": cube.default_limit,
389        "maxLimit": cube.max_limit,
390    })
391}
392
393/// Build a single cube Field that reads `network` from the parent ChainContext.
394fn build_cube_field(
395    cube: &CubeDefinition,
396    dialect: Arc<dyn SqlDialect>,
397    executor: QueryExecutor,
398    stats_cb: Option<StatsCallback>,
399) -> Field {
400    let cube_name = cube.name.clone();
401    let orderby_input_name = format!("{}OrderByInput", cube.name);
402    let cube_description = cube.description.clone();
403
404    let mut field = Field::new(
405        &cube.name,
406        TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
407        move |ctx| {
408            let cube_name = cube_name.clone();
409            let dialect = dialect.clone();
410            let executor = executor.clone();
411            let stats_cb = stats_cb.clone();
412            FieldFuture::new(async move {
413                let registry = ctx.ctx.data::<CubeRegistry>()?;
414
415                let chain_ctx = ctx.parent_value
416                    .try_downcast_ref::<ChainContext>().ok();
417                let network = chain_ctx
418                    .map(|c| c.network.as_str())
419                    .unwrap_or("sol");
420
421                let cube_def = registry.get(&cube_name).ok_or_else(|| {
422                    async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
423                })?;
424
425                let metric_requests = extract_metric_requests(&ctx, cube_def);
426                let quantile_requests = extract_quantile_requests(&ctx);
427                let calculate_requests = extract_calculate_requests(&ctx);
428                let field_aliases = extract_field_aliases(&ctx, cube_def);
429                let dim_agg_requests = extract_dim_agg_requests(&ctx, cube_def);
430                let time_intervals = extract_time_interval_requests(&ctx, cube_def);
431                let requested = extract_requested_fields(&ctx, cube_def);
432                let mut ir = compiler::parser::parse_cube_query(
433                    cube_def,
434                    network,
435                    &ctx.args,
436                    &metric_requests,
437                    &quantile_requests,
438                    &calculate_requests,
439                    &field_aliases,
440                    &dim_agg_requests,
441                    &time_intervals,
442                    Some(requested),
443                )?;
444
445                let mut join_idx = 0usize;
446                for sub_field in ctx.ctx.field().selection_set() {
447                    let fname = sub_field.name().to_string();
448                    let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
449                    if let Some(jd) = join_def {
450                        if let Some(target_cube) = registry.get(&jd.target_cube) {
451                            let join_expr = build_join_expr(
452                                jd, target_cube, &sub_field, network, join_idx,
453                            );
454                            ir.joins.push(join_expr);
455                            join_idx += 1;
456                        }
457                    }
458                }
459
460                if let Ok(transform) = ctx.ctx.data::<TableNameTransform>() {
461                    if let Some(cctx) = chain_ctx {
462                        ir.table = transform(&ir.table, cctx);
463                    }
464                }
465
466                let validated = compiler::validator::validate(ir)?;
467                let result = dialect.compile(&validated);
468                let sql = result.sql;
469                let bindings = result.bindings;
470
471                let rows = executor(sql.clone(), bindings).await.map_err(|e| {
472                    async_graphql::Error::new(format!("Query execution failed: {e}"))
473                })?;
474
475                let rows = if result.alias_remap.is_empty() {
476                    rows
477                } else {
478                    rows.into_iter().map(|mut row| {
479                        for (alias, original) in &result.alias_remap {
480                            if let Some(val) = row.shift_remove(alias) {
481                                row.entry(original.clone()).or_insert(val);
482                            }
483                        }
484                        row
485                    }).collect()
486                };
487
488                let rows: Vec<RowMap> = if validated.joins.is_empty() {
489                    rows
490                } else {
491                    rows.into_iter().map(|mut row| {
492                        for join in &validated.joins {
493                            let prefix = format!("{}.", join.alias);
494                            let mut sub_row = RowMap::new();
495                            let keys: Vec<String> = row.keys()
496                                .filter(|k| k.starts_with(&prefix))
497                                .cloned()
498                                .collect();
499                            for key in keys {
500                                if let Some(val) = row.shift_remove(&key) {
501                                    sub_row.insert(key[prefix.len()..].to_string(), val);
502                                }
503                            }
504                            let obj: serde_json::Map<String, serde_json::Value> =
505                                sub_row.into_iter().collect();
506                            row.insert(
507                                join.join_field.clone(),
508                                serde_json::Value::Object(obj),
509                            );
510                        }
511                        row
512                    }).collect()
513                };
514
515                let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
516                    .or_else(|| stats_cb.clone());
517                if let Some(cb) = effective_cb {
518                    let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
519                    cb(stats);
520                }
521
522                let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
523                Ok(Some(FieldValue::list(values)))
524            })
525        },
526    );
527    if !cube_description.is_empty() {
528        field = field.description(&cube_description);
529    }
530    field = field
531        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
532            .description("Filter conditions"))
533        .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
534            .description("Pagination control"))
535        .argument(InputValue::new("limitBy", TypeRef::named(format!("{}LimitByInput", cube.name)))
536            .description("Per-group row limit"))
537        .argument(InputValue::new("orderBy", TypeRef::named(&orderby_input_name))
538            .description("Sort order (Bitquery-compatible)"));
539
540    for sel in &cube.selectors {
541        let filter_type = dim_type_to_filter_name(&sel.dim_type);
542        field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
543            .description(format!("Shorthand filter for {}", sel.graphql_name)));
544    }
545
546    field
547}
548
549fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
550    serde_json::Value::Array(dims.iter().map(|d| match d {
551        DimensionNode::Leaf(dim) => {
552            let mut obj = serde_json::json!({
553                "name": dim.graphql_name,
554                "column": dim.column,
555                "type": format!("{:?}", dim.dim_type),
556            });
557            if let Some(desc) = &dim.description {
558                obj["description"] = serde_json::Value::String(desc.clone());
559            }
560            obj
561        },
562        DimensionNode::Group { graphql_name, description, children } => {
563            let mut obj = serde_json::json!({
564                "name": graphql_name,
565                "children": serialize_dims(children),
566            });
567            if let Some(desc) = description {
568                obj["description"] = serde_json::Value::String(desc.clone());
569            }
570            obj
571        },
572        DimensionNode::Array { graphql_name, description, children } => {
573            let fields: Vec<serde_json::Value> = children.iter().map(|f| {
574                let type_value = match &f.field_type {
575                    crate::cube::definition::ArrayFieldType::Scalar(dt) => serde_json::json!({
576                        "kind": "scalar",
577                        "scalarType": format!("{:?}", dt),
578                    }),
579                    crate::cube::definition::ArrayFieldType::Union(variants) => serde_json::json!({
580                        "kind": "union",
581                        "variants": variants.iter().map(|v| serde_json::json!({
582                            "typeName": v.type_name,
583                            "fieldName": v.field_name,
584                            "sourceType": format!("{:?}", v.source_type),
585                        })).collect::<Vec<_>>(),
586                    }),
587                };
588                let mut field_obj = serde_json::json!({
589                    "name": f.graphql_name,
590                    "column": f.column,
591                    "type": type_value,
592                });
593                if let Some(desc) = &f.description {
594                    field_obj["description"] = serde_json::Value::String(desc.clone());
595                }
596                field_obj
597            }).collect();
598            let mut obj = serde_json::json!({
599                "name": graphql_name,
600                "kind": "array",
601                "fields": fields,
602            });
603            if let Some(desc) = description {
604                obj["description"] = serde_json::Value::String(desc.clone());
605            }
606            obj
607        },
608    }).collect())
609}
610
611/// Extract metric requests from the GraphQL selection set by inspecting
612/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
613/// the "count" field in the selection set and extract its `of` argument.
614fn extract_metric_requests(
615    ctx: &async_graphql::dynamic::ResolverContext,
616    cube: &CubeDefinition,
617) -> Vec<compiler::parser::MetricRequest> {
618    let mut requests = Vec::new();
619
620    for sub_field in ctx.ctx.field().selection_set() {
621        let name = sub_field.name();
622        if !cube.has_metric(name) {
623            continue;
624        }
625
626        let args = match sub_field.arguments() {
627            Ok(args) => args,
628            Err(_) => continue,
629        };
630
631        let of_dimension = args
632            .iter()
633            .find(|(k, _)| k.as_str() == "of")
634            .and_then(|(_, v)| match v {
635                async_graphql::Value::Enum(e) => Some(e.to_string()),
636                async_graphql::Value::String(s) => Some(s.clone()),
637                _ => None,
638            })
639            .unwrap_or_else(|| "*".to_string());
640
641        let select_where_value = args
642            .iter()
643            .find(|(k, _)| k.as_str() == "selectWhere")
644            .map(|(_, v)| v.clone());
645
646        let condition_filter = args
647            .iter()
648            .find(|(k, _)| k.as_str() == "if")
649            .and_then(|(_, v)| {
650                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
651                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
652            });
653
654        let gql_alias = sub_field.alias().unwrap_or(name).to_string();
655        requests.push(compiler::parser::MetricRequest {
656            function: name.to_string(),
657            alias: gql_alias,
658            of_dimension,
659            select_where_value,
660            condition_filter,
661        });
662    }
663
664    requests
665}
666
667fn extract_requested_fields(
668    ctx: &async_graphql::dynamic::ResolverContext,
669    cube: &CubeDefinition,
670) -> HashSet<String> {
671    let mut fields = HashSet::new();
672    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
673    fields
674}
675
676fn collect_selection_paths(
677    field: &async_graphql::SelectionField<'_>,
678    prefix: &str,
679    out: &mut HashSet<String>,
680    metrics: &[MetricDef],
681) {
682    for sub in field.selection_set() {
683        let name = sub.name();
684        if metrics.iter().any(|m| m.name == name) {
685            continue;
686        }
687        let path = if prefix.is_empty() {
688            name.to_string()
689        } else {
690            format!("{prefix}_{name}")
691        };
692        let has_children = sub.selection_set().next().is_some();
693        if has_children {
694            collect_selection_paths(&sub, &path, out, metrics);
695        } else {
696            out.insert(path);
697        }
698    }
699}
700
701/// Maps `{parent_path}_{graphql_alias}` → ClickHouse column name for aliased dimension fields.
702/// Used by `calculate` to resolve `$TokenSupplyUpdate_post` → column.
703pub type FieldAliasMap = Vec<(String, String)>;
704
705/// Describes a `quantile(of: ..., level: ...)` request.
706pub struct QuantileRequest {
707    pub alias: String,
708    pub of_dimension: String,
709    pub level: f64,
710}
711
712/// Describes a `calculate(expression: "...")` request.
713pub struct CalculateRequest {
714    pub alias: String,
715    pub expression: String,
716}
717
718/// Describes a time interval bucketing request.
719/// `Time(interval: {in: minutes, count: 1})` → replaces column with `toStartOfInterval(ts, ...)`
720pub struct TimeIntervalRequest {
721    pub field_path: String,
722    pub graphql_alias: String,
723    pub column: String,
724    pub unit: String,
725    pub count: i64,
726}
727
728/// Describes a dimension-level aggregation request extracted from the selection set.
729/// `PostBalance(maximum: Block_Slot)` → DimAggRequest { agg_type: ArgMax, ... }
730pub struct DimAggRequest {
731    pub field_path: String,
732    pub graphql_alias: String,
733    pub value_column: String,
734    pub agg_type: DimAggType,
735    pub compare_column: String,
736    pub condition_filter: Option<FilterNode>,
737    pub select_where_value: Option<async_graphql::Value>,
738}
739
740fn extract_quantile_requests(
741    ctx: &async_graphql::dynamic::ResolverContext,
742) -> Vec<QuantileRequest> {
743    let mut requests = Vec::new();
744    for sub_field in ctx.ctx.field().selection_set() {
745        if sub_field.name() != "quantile" { continue; }
746        let args = match sub_field.arguments() {
747            Ok(a) => a,
748            Err(_) => continue,
749        };
750        let of_dim = args.iter()
751            .find(|(k, _)| k.as_str() == "of")
752            .and_then(|(_, v)| match v {
753                async_graphql::Value::Enum(e) => Some(e.to_string()),
754                async_graphql::Value::String(s) => Some(s.clone()),
755                _ => None,
756            })
757            .unwrap_or_else(|| "*".to_string());
758        let level = args.iter()
759            .find(|(k, _)| k.as_str() == "level")
760            .and_then(|(_, v)| match v {
761                async_graphql::Value::Number(n) => n.as_f64(),
762                _ => None,
763            })
764            .unwrap_or(0.5);
765        let alias = sub_field.alias().unwrap_or("quantile").to_string();
766        requests.push(QuantileRequest { alias, of_dimension: of_dim, level });
767    }
768    requests
769}
770
771fn extract_field_aliases(
772    ctx: &async_graphql::dynamic::ResolverContext,
773    cube: &CubeDefinition,
774) -> FieldAliasMap {
775    let flat = cube.flat_dimensions();
776    let mut aliases = Vec::new();
777    collect_field_aliases(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut aliases);
778    aliases
779}
780
781fn collect_field_aliases(
782    field: &async_graphql::SelectionField<'_>,
783    prefix: &str,
784    flat: &[(String, crate::cube::definition::Dimension)],
785    metrics: &[MetricDef],
786    out: &mut FieldAliasMap,
787) {
788    for sub in field.selection_set() {
789        let name = sub.name();
790        if metrics.iter().any(|m| m.name == name) || name == "calculate" || name == "quantile" {
791            continue;
792        }
793        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
794        let has_children = sub.selection_set().next().is_some();
795        if has_children {
796            collect_field_aliases(&sub, &path, flat, metrics, out);
797        } else if let Some(alias) = sub.alias() {
798            if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
799                let alias_path = if prefix.is_empty() {
800                    alias.to_string()
801                } else {
802                    format!("{prefix}_{alias}")
803                };
804                out.push((alias_path, dim.column.clone()));
805            }
806        }
807    }
808}
809
810fn extract_calculate_requests(
811    ctx: &async_graphql::dynamic::ResolverContext,
812) -> Vec<CalculateRequest> {
813    let mut requests = Vec::new();
814    for sub_field in ctx.ctx.field().selection_set() {
815        if sub_field.name() != "calculate" { continue; }
816        let args = match sub_field.arguments() {
817            Ok(a) => a,
818            Err(_) => continue,
819        };
820        let expression = args.iter()
821            .find(|(k, _)| k.as_str() == "expression")
822            .and_then(|(_, v)| match v {
823                async_graphql::Value::String(s) => Some(s.clone()),
824                _ => None,
825            });
826        if let Some(expr) = expression {
827            let alias = sub_field.alias().unwrap_or("calculate").to_string();
828            requests.push(CalculateRequest { alias, expression: expr });
829        }
830    }
831    requests
832}
833
834fn extract_dim_agg_requests(
835    ctx: &async_graphql::dynamic::ResolverContext,
836    cube: &CubeDefinition,
837) -> Vec<DimAggRequest> {
838    let flat = cube.flat_dimensions();
839    let mut requests = Vec::new();
840    collect_dim_agg_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, cube, &mut requests);
841    requests
842}
843
844fn collect_dim_agg_paths(
845    field: &async_graphql::SelectionField<'_>,
846    prefix: &str,
847    flat: &[(String, crate::cube::definition::Dimension)],
848    metrics: &[MetricDef],
849    cube: &CubeDefinition,
850    out: &mut Vec<DimAggRequest>,
851) {
852    for sub in field.selection_set() {
853        let name = sub.name();
854        if metrics.iter().any(|m| m.name == name) {
855            continue;
856        }
857        let path = if prefix.is_empty() {
858            name.to_string()
859        } else {
860            format!("{prefix}_{name}")
861        };
862        let has_children = sub.selection_set().next().is_some();
863        if has_children {
864            collect_dim_agg_paths(&sub, &path, flat, metrics, cube, out);
865        } else {
866            let args = match sub.arguments() {
867                Ok(a) => a,
868                Err(_) => continue,
869            };
870            let max_val = args.iter().find(|(k, _)| k.as_str() == "maximum");
871            let min_val = args.iter().find(|(k, _)| k.as_str() == "minimum");
872
873            let (agg_type, compare_path) = if let Some((_, v)) = max_val {
874                let cp = match v {
875                    async_graphql::Value::Enum(e) => e.to_string(),
876                    async_graphql::Value::String(s) => s.clone(),
877                    _ => continue,
878                };
879                (DimAggType::ArgMax, cp)
880            } else if let Some((_, v)) = min_val {
881                let cp = match v {
882                    async_graphql::Value::Enum(e) => e.to_string(),
883                    async_graphql::Value::String(s) => s.clone(),
884                    _ => continue,
885                };
886                (DimAggType::ArgMin, cp)
887            } else {
888                continue;
889            };
890
891            let value_column = flat.iter()
892                .find(|(p, _)| p == &path)
893                .map(|(_, dim)| dim.column.clone());
894            let compare_column = flat.iter()
895                .find(|(p, _)| p == &compare_path)
896                .map(|(_, dim)| dim.column.clone());
897
898            if let (Some(vc), Some(cc)) = (value_column, compare_column) {
899                let condition_filter = args.iter()
900                    .find(|(k, _)| k.as_str() == "if")
901                    .and_then(|(_, v)| {
902                        compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
903                            .and_then(|f| if f.is_empty() { None } else { Some(f) })
904                    });
905                let select_where_value = args.iter()
906                    .find(|(k, _)| k.as_str() == "selectWhere")
907                    .map(|(_, v)| v.clone());
908
909                let gql_alias = sub.alias()
910                    .map(|a| a.to_string())
911                    .unwrap_or_else(|| path.clone());
912                out.push(DimAggRequest {
913                    field_path: path,
914                    graphql_alias: gql_alias,
915                    value_column: vc,
916                    agg_type,
917                    compare_column: cc,
918                    condition_filter,
919                    select_where_value,
920                });
921            }
922        }
923    }
924}
925
926fn extract_time_interval_requests(
927    ctx: &async_graphql::dynamic::ResolverContext,
928    cube: &CubeDefinition,
929) -> Vec<TimeIntervalRequest> {
930    let flat = cube.flat_dimensions();
931    let mut requests = Vec::new();
932    collect_time_interval_paths(&ctx.ctx.field(), "", &flat, &cube.metrics, &mut requests);
933    requests
934}
935
936fn collect_time_interval_paths(
937    field: &async_graphql::SelectionField<'_>,
938    prefix: &str,
939    flat: &[(String, crate::cube::definition::Dimension)],
940    metrics: &[MetricDef],
941    out: &mut Vec<TimeIntervalRequest>,
942) {
943    for sub in field.selection_set() {
944        let name = sub.name();
945        if metrics.iter().any(|m| m.name == name) { continue; }
946        let path = if prefix.is_empty() { name.to_string() } else { format!("{prefix}_{name}") };
947        let has_children = sub.selection_set().next().is_some();
948        if has_children {
949            collect_time_interval_paths(&sub, &path, flat, metrics, out);
950        } else {
951            let args = match sub.arguments() {
952                Ok(a) => a,
953                Err(_) => continue,
954            };
955            let interval_val = args.iter().find(|(k, _)| k.as_str() == "interval");
956            if let Some((_, async_graphql::Value::Object(obj))) = interval_val {
957                let unit = obj.get("in")
958                    .and_then(|v| match v {
959                        async_graphql::Value::Enum(e) => Some(e.to_string()),
960                        async_graphql::Value::String(s) => Some(s.clone()),
961                        _ => None,
962                    });
963                let count = obj.get("count")
964                    .and_then(|v| match v {
965                        async_graphql::Value::Number(n) => n.as_i64(),
966                        _ => None,
967                    });
968                if let (Some(unit), Some(count)) = (unit, count) {
969                    if let Some((_, dim)) = flat.iter().find(|(p, _)| p == &path) {
970                        let gql_alias = sub.alias().unwrap_or(name).to_string();
971                        out.push(TimeIntervalRequest {
972                            field_path: path,
973                            graphql_alias: gql_alias,
974                            column: dim.column.clone(),
975                            unit,
976                            count,
977                        });
978                    }
979                }
980            }
981        }
982    }
983}
984
985// ---------------------------------------------------------------------------
986// Per-Cube GraphQL type generation
987// ---------------------------------------------------------------------------
988
989struct CubeTypes {
990    objects: Vec<Object>,
991    inputs: Vec<InputObject>,
992    enums: Vec<Enum>,
993    unions: Vec<Union>,
994}
995
996fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
997    let record_name = format!("{}Record", cube.name);
998    let filter_name = format!("{}Filter", cube.name);
999    let compare_enum_name = format!("{}CompareFields", cube.name);
1000
1001    let flat_dims = cube.flat_dimensions();
1002
1003    let mut compare_enum = Enum::new(&compare_enum_name)
1004        .description(format!("Fields available for dimension aggregation (maximum/minimum) and ordering in {}", cube.name));
1005    for (path, _) in &flat_dims {
1006        compare_enum = compare_enum.item(EnumItem::new(path));
1007    }
1008
1009    let mut record_fields: Vec<Field> = Vec::new();
1010    let mut filter_fields: Vec<InputValue> = Vec::new();
1011    let mut extra_objects: Vec<Object> = Vec::new();
1012    let mut extra_inputs: Vec<InputObject> = Vec::new();
1013    let mut extra_unions: Vec<Union> = Vec::new();
1014
1015    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
1016        .description("OR combinator — matches if any sub-filter matches"));
1017
1018    {
1019        let mut collector = DimCollector {
1020            cube_name: &cube.name,
1021            compare_enum_name: &compare_enum_name,
1022            filter_name: &filter_name,
1023            record_fields: &mut record_fields,
1024            filter_fields: &mut filter_fields,
1025            extra_objects: &mut extra_objects,
1026            extra_inputs: &mut extra_inputs,
1027            extra_unions: &mut extra_unions,
1028        };
1029        for node in &cube.dimensions {
1030            collect_dimension_types(node, "", &mut collector);
1031        }
1032    }
1033
1034    let mut metric_enums: Vec<Enum> = Vec::new();
1035    let builtin_descs: std::collections::HashMap<&str, &str> = [
1036        ("count", "Count of rows or distinct values"),
1037        ("sum", "Sum of values"),
1038        ("avg", "Average of values"),
1039        ("min", "Minimum value"),
1040        ("max", "Maximum value"),
1041        ("uniq", "Count of unique (distinct) values"),
1042    ].into_iter().collect();
1043
1044    for metric in &cube.metrics {
1045        let metric_name = &metric.name;
1046        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
1047
1048        if metric.supports_where {
1049            extra_inputs.push(
1050                InputObject::new(&select_where_name)
1051                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
1052                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
1053                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
1054                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
1055                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
1056                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
1057            );
1058        }
1059
1060        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
1061        let mut of_enum = Enum::new(&of_enum_name)
1062            .description(format!("Dimension to apply {} aggregation on", metric_name));
1063        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1064        metric_enums.push(of_enum);
1065
1066        let metric_name_clone = metric_name.clone();
1067        let return_type_ref = dim_type_to_typeref(&metric.return_type);
1068        let metric_desc = metric.description.as_deref()
1069            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
1070            .unwrap_or("Aggregate metric");
1071
1072        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
1073            let default_name = metric_name_clone.clone();
1074            FieldFuture::new(async move {
1075                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1076                let alias = ctx.ctx.field().alias().unwrap_or(&default_name);
1077                let key = metric_key(alias);
1078                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1079                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1080            })
1081        })
1082        .description(metric_desc)
1083        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
1084            .description("Dimension to aggregate on (default: all rows)"));
1085
1086        if metric.supports_where {
1087            metric_field = metric_field
1088                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
1089                    .description("Post-aggregation filter (HAVING)"))
1090                .argument(InputValue::new("if", TypeRef::named(&filter_name))
1091                    .description("Conditional filter for this metric"));
1092        }
1093
1094        record_fields.push(metric_field);
1095    }
1096
1097    // `quantile(of: ..., level: ...)` — percentile computation
1098    {
1099        let of_enum_name = format!("{}_quantile_Of", cube.name);
1100        let mut of_enum = Enum::new(&of_enum_name)
1101            .description(format!("Dimension to apply quantile on for {}", cube.name));
1102        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
1103        metric_enums.push(of_enum);
1104
1105        let of_enum_for_closure = of_enum_name.clone();
1106        let quantile_field = Field::new("quantile", TypeRef::named(TypeRef::FLOAT), |ctx| {
1107            FieldFuture::new(async move {
1108                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1109                let alias = ctx.ctx.field().alias().unwrap_or("quantile");
1110                let key = metric_key(alias);
1111                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1112                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1113            })
1114        })
1115        .description("Compute a quantile (percentile) of a dimension")
1116        .argument(InputValue::new("of", TypeRef::named_nn(&of_enum_for_closure))
1117            .description("Dimension to compute quantile on"))
1118        .argument(InputValue::new("level", TypeRef::named_nn(TypeRef::FLOAT))
1119            .description("Quantile level (0 to 1, e.g. 0.95 for 95th percentile)"));
1120        record_fields.push(quantile_field);
1121    }
1122
1123    // `calculate(expression: "...")` — runtime expression computation
1124    {
1125        let calculate_field = Field::new("calculate", TypeRef::named(TypeRef::FLOAT), |ctx| {
1126            FieldFuture::new(async move {
1127                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1128                let alias = ctx.ctx.field().alias().unwrap_or("calculate");
1129                let key = metric_key(alias);
1130                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1131                Ok(Some(FieldValue::value(json_to_gql_value(val))))
1132            })
1133        })
1134        .description("Compute an expression from other metric values. Use $field_name to reference metrics.")
1135        .argument(InputValue::new("expression", TypeRef::named_nn(TypeRef::STRING))
1136            .description("SQL expression with $variable references (e.g. \"$sell_volume - $buy_volume\")"));
1137        record_fields.push(calculate_field);
1138    }
1139
1140    // Add join fields: joinXxx returns the target cube's Record type
1141    for jd in &cube.joins {
1142        let target_record_name = format!("{}Record", jd.target_cube);
1143        let field_name_owned = jd.field_name.clone();
1144        let mut join_field = Field::new(
1145            &jd.field_name,
1146            TypeRef::named(&target_record_name),
1147            move |ctx| {
1148                let field_name = field_name_owned.clone();
1149                FieldFuture::new(async move {
1150                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1151                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
1152                        let sub_row: RowMap = obj.iter()
1153                            .map(|(k, v)| (k.clone(), v.clone()))
1154                            .collect();
1155                        Ok(Some(FieldValue::owned_any(sub_row)))
1156                    } else {
1157                        Ok(Some(FieldValue::value(Value::Null)))
1158                    }
1159                })
1160            },
1161        );
1162        if let Some(desc) = &jd.description {
1163            join_field = join_field.description(desc);
1164        }
1165        record_fields.push(join_field);
1166    }
1167
1168    let mut record = Object::new(&record_name);
1169    for f in record_fields { record = record.field(f); }
1170
1171    let mut filter = InputObject::new(&filter_name)
1172        .description(format!("Filter conditions for {} query", cube.name));
1173    for f in filter_fields { filter = filter.field(f); }
1174
1175    let orderby_input_name = format!("{}OrderByInput", cube.name);
1176    let orderby_input = InputObject::new(&orderby_input_name)
1177        .description(format!("Sort order for {} (Bitquery-compatible)", cube.name))
1178        .field(InputValue::new("descending", TypeRef::named(&compare_enum_name))
1179            .description("Sort descending by this field"))
1180        .field(InputValue::new("ascending", TypeRef::named(&compare_enum_name))
1181            .description("Sort ascending by this field"))
1182        .field(InputValue::new("descendingByField", TypeRef::named(TypeRef::STRING))
1183            .description("Sort descending by computed/aggregated field name"))
1184        .field(InputValue::new("ascendingByField", TypeRef::named(TypeRef::STRING))
1185            .description("Sort ascending by computed/aggregated field name"));
1186
1187    let limitby_input_name = format!("{}LimitByInput", cube.name);
1188    let limitby_input = InputObject::new(&limitby_input_name)
1189        .description(format!("Per-group row limit for {}", cube.name))
1190        .field(InputValue::new("by", TypeRef::named_nn(&compare_enum_name))
1191            .description("Dimension field to group by"))
1192        .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
1193            .description("Maximum rows per group"))
1194        .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
1195            .description("Rows to skip per group"));
1196
1197    let mut objects = vec![record]; objects.extend(extra_objects);
1198    let mut inputs = vec![filter, orderby_input, limitby_input]; inputs.extend(extra_inputs);
1199    let mut enums = vec![compare_enum]; enums.extend(metric_enums);
1200
1201    CubeTypes { objects, inputs, enums, unions: extra_unions }
1202}
1203
1204struct DimCollector<'a> {
1205    cube_name: &'a str,
1206    compare_enum_name: &'a str,
1207    filter_name: &'a str,
1208    record_fields: &'a mut Vec<Field>,
1209    filter_fields: &'a mut Vec<InputValue>,
1210    extra_objects: &'a mut Vec<Object>,
1211    extra_inputs: &'a mut Vec<InputObject>,
1212    extra_unions: &'a mut Vec<Union>,
1213}
1214
1215fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
1216    match node {
1217        DimensionNode::Leaf(dim) => {
1218            let col = dim.column.clone();
1219            let is_datetime = dim.dim_type == DimType::DateTime;
1220            let compare_enum = c.compare_enum_name.to_string();
1221            let cube_filter = c.filter_name.to_string();
1222            let mut leaf_field = Field::new(
1223                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
1224                move |ctx| {
1225                    let col = col.clone();
1226                    FieldFuture::new(async move {
1227                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1228                        let has_interval = ctx.args.try_get("interval").is_ok();
1229                        let has_max = ctx.args.try_get("maximum").is_ok();
1230                        let has_min = ctx.args.try_get("minimum").is_ok();
1231                        let key = if has_interval || has_max || has_min {
1232                            let name = ctx.ctx.field().alias().unwrap_or(&col);
1233                            dim_agg_key(name)
1234                        } else {
1235                            col.clone()
1236                        };
1237                        let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
1238                        let gql_val = if is_datetime {
1239                            json_to_gql_datetime(val)
1240                        } else {
1241                            json_to_gql_value(val)
1242                        };
1243                        Ok(Some(FieldValue::value(gql_val)))
1244                    })
1245                },
1246            );
1247            if let Some(desc) = &dim.description {
1248                leaf_field = leaf_field.description(desc);
1249            }
1250            leaf_field = leaf_field
1251                .argument(InputValue::new("maximum", TypeRef::named(&compare_enum))
1252                    .description("Return value from row where compare field is maximum (argMax)"))
1253                .argument(InputValue::new("minimum", TypeRef::named(&compare_enum))
1254                    .description("Return value from row where compare field is minimum (argMin)"))
1255                .argument(InputValue::new("if", TypeRef::named(&cube_filter))
1256                    .description("Conditional filter for aggregation"))
1257                .argument(InputValue::new("selectWhere", TypeRef::named("DimSelectWhere"))
1258                    .description("Post-aggregation value filter (HAVING)"));
1259            if is_datetime {
1260                leaf_field = leaf_field
1261                    .argument(InputValue::new("interval", TypeRef::named("TimeIntervalInput"))
1262                        .description("Time bucketing interval (e.g. {in: minutes, count: 1})"));
1263            }
1264            // Update resolver key: if interval is present, read from the interval expression key
1265            // This is handled by the resolver checking ctx.args for "interval"
1266            c.record_fields.push(leaf_field);
1267            if matches!(dim.dim_type, DimType::Bool) {
1268                c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(TypeRef::BOOLEAN)));
1269            } else {
1270                c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
1271            }
1272        }
1273        DimensionNode::Group { graphql_name, description, children } => {
1274            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1275            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
1276            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
1277
1278            let mut child_record_fields: Vec<Field> = Vec::new();
1279            let mut child_filter_fields: Vec<InputValue> = Vec::new();
1280            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
1281
1282            let mut child_collector = DimCollector {
1283                cube_name: c.cube_name,
1284                compare_enum_name: c.compare_enum_name,
1285                filter_name: c.filter_name,
1286                record_fields: &mut child_record_fields,
1287                filter_fields: &mut child_filter_fields,
1288                extra_objects: c.extra_objects,
1289                extra_inputs: c.extra_inputs,
1290                extra_unions: c.extra_unions,
1291            };
1292            for child in children {
1293                collect_dimension_types(child, &new_prefix, &mut child_collector);
1294            }
1295
1296            let mut nested_record = Object::new(&nested_record_name);
1297            for f in child_record_fields { nested_record = nested_record.field(f); }
1298
1299            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
1300            let mut nested_filter = InputObject::new(&nested_filter_name)
1301                .description(nested_filter_desc);
1302            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
1303
1304            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
1305                FieldFuture::new(async move {
1306                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1307                    Ok(Some(FieldValue::owned_any(row.clone())))
1308                })
1309            });
1310            if let Some(desc) = description {
1311                group_field = group_field.description(desc);
1312            }
1313            c.record_fields.push(group_field);
1314            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
1315            c.extra_objects.push(nested_record);
1316            c.extra_inputs.push(nested_filter);
1317        }
1318        DimensionNode::Array { graphql_name, description, children } => {
1319            let full_path = if prefix.is_empty() {
1320                graphql_name.clone()
1321            } else {
1322                format!("{prefix}_{graphql_name}")
1323            };
1324            let element_type_name = format!("{}_{full_path}_Element", c.cube_name);
1325            let includes_filter_name = format!("{}_{full_path}_IncludesFilter", c.cube_name);
1326
1327            let mut element_obj = Object::new(&element_type_name);
1328            let mut includes_filter = InputObject::new(&includes_filter_name)
1329                .description(format!("Element-level filter for {} (used with includes)", graphql_name));
1330
1331            let mut union_registrations: Vec<(String, Union, Vec<Object>)> = Vec::new();
1332
1333            for child in children {
1334                match &child.field_type {
1335                    crate::cube::definition::ArrayFieldType::Scalar(dt) => {
1336                        let col_name = child.column.clone();
1337                        let is_datetime = *dt == DimType::DateTime;
1338                        let mut field = Field::new(
1339                            &child.graphql_name,
1340                            dim_type_to_typeref(dt),
1341                            move |ctx| {
1342                                let col = col_name.clone();
1343                                FieldFuture::new(async move {
1344                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1345                                    let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1346                                    let gql_val = if is_datetime {
1347                                        json_to_gql_datetime(val)
1348                                    } else {
1349                                        json_to_gql_value(val)
1350                                    };
1351                                    Ok(Some(FieldValue::value(gql_val)))
1352                                })
1353                            },
1354                        );
1355                        if let Some(desc) = &child.description {
1356                            field = field.description(desc);
1357                        }
1358                        element_obj = element_obj.field(field);
1359                        includes_filter = includes_filter.field(
1360                            InputValue::new(&child.graphql_name, TypeRef::named(dim_type_to_filter_name(dt)))
1361                        );
1362                    }
1363                    crate::cube::definition::ArrayFieldType::Union(variants) => {
1364                        let union_name = format!("{}_{full_path}_{}_Union", c.cube_name, child.graphql_name);
1365
1366                        let mut gql_union = Union::new(&union_name);
1367                        let mut variant_objects = Vec::new();
1368
1369                        for v in variants {
1370                            let variant_obj_name = v.type_name.clone();
1371                            let field_name = v.field_name.clone();
1372                            let source_type = v.source_type.clone();
1373                            let type_ref = dim_type_to_typeref(&source_type);
1374
1375                            let val_col = child.column.clone();
1376                            let variant_obj = Object::new(&variant_obj_name)
1377                                .field(Field::new(
1378                                    &field_name,
1379                                    type_ref,
1380                                    move |ctx| {
1381                                        let col = val_col.clone();
1382                                        FieldFuture::new(async move {
1383                                            let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1384                                            let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1385                                            Ok(Some(FieldValue::value(json_to_gql_value(val))))
1386                                        })
1387                                    },
1388                                ));
1389                            gql_union = gql_union.possible_type(&variant_obj_name);
1390                            variant_objects.push(variant_obj);
1391                        }
1392
1393                        let col_name = child.column.clone();
1394                        let type_col = children.iter()
1395                            .find(|f| f.graphql_name == "Type")
1396                            .map(|f| f.column.clone())
1397                            .unwrap_or_default();
1398                        let variants_clone: Vec<crate::cube::definition::UnionVariant> = variants.clone();
1399
1400                        let mut field = Field::new(
1401                            &child.graphql_name,
1402                            TypeRef::named(&union_name),
1403                            move |ctx| {
1404                                let col = col_name.clone();
1405                                let tcol = type_col.clone();
1406                                let vars = variants_clone.clone();
1407                                FieldFuture::new(async move {
1408                                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1409                                    let raw_val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
1410                                    if raw_val.is_null() || raw_val.as_str() == Some("") {
1411                                        return Ok(None);
1412                                    }
1413                                    let type_str = row.get(&tcol)
1414                                        .and_then(|v| v.as_str())
1415                                        .unwrap_or("");
1416                                    let resolved_type = resolve_union_typename(type_str, &vars);
1417                                    let mut elem = RowMap::new();
1418                                    elem.insert(col.clone(), raw_val);
1419                                    Ok(Some(FieldValue::owned_any(elem).with_type(resolved_type)))
1420                                })
1421                            },
1422                        );
1423                        if let Some(desc) = &child.description {
1424                            field = field.description(desc);
1425                        }
1426                        element_obj = element_obj.field(field);
1427
1428                        includes_filter = includes_filter.field(
1429                            InputValue::new(&child.graphql_name, TypeRef::named("StringFilter"))
1430                        );
1431
1432                        union_registrations.push((union_name, gql_union, variant_objects));
1433                    }
1434                }
1435            }
1436
1437            // Register union types and variant objects
1438            for (_, union_type, variant_objs) in union_registrations {
1439                c.extra_objects.extend(variant_objs);
1440                // Unions need to be registered differently — store in extra_objects
1441                // by wrapping in a dummy. Actually, we need to register unions via builder.
1442                // For now, we'll store them in a new field. Let's use the enums vec hack:
1443                // Actually, async-graphql dynamic schema registers unions the same way.
1444                // We need to pass them up. Let's add a unions vec to DimCollector.
1445                // For now, store union as Object (it won't work). We need to refactor.
1446                // Let me add unions to CubeTypes.
1447                c.extra_unions.push(union_type);
1448            }
1449
1450            // Build the list resolver: zip parallel array columns into element objects
1451            let child_columns: Vec<(String, String)> = children.iter()
1452                .map(|f| (f.graphql_name.clone(), f.column.clone()))
1453                .collect();
1454            let element_type_name_clone = element_type_name.clone();
1455
1456            let mut array_field = Field::new(
1457                graphql_name,
1458                TypeRef::named_nn_list_nn(&element_type_name),
1459                move |ctx| {
1460                    let cols = child_columns.clone();
1461                    let _etype = element_type_name_clone.clone();
1462                    FieldFuture::new(async move {
1463                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
1464                        let arrays: Vec<(&str, Vec<serde_json::Value>)> = cols.iter()
1465                            .map(|(gql_name, col)| {
1466                                let arr = row.get(col)
1467                                    .and_then(|v| v.as_array())
1468                                    .cloned()
1469                                    .unwrap_or_default();
1470                                (gql_name.as_str(), arr)
1471                            })
1472                            .collect();
1473
1474                        let len = arrays.first().map(|(_, a)| a.len()).unwrap_or(0);
1475                        let mut elements = Vec::with_capacity(len);
1476                        for i in 0..len {
1477                            let mut elem = RowMap::new();
1478                            for (gql_name, arr) in &arrays {
1479                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1480                                elem.insert(gql_name.to_string(), val);
1481                            }
1482                            // Also store by column name for Union resolvers
1483                            for ((_gql_name, col), (_, arr)) in cols.iter().zip(arrays.iter()) {
1484                                let val = arr.get(i).cloned().unwrap_or(serde_json::Value::Null);
1485                                elem.insert(col.clone(), val);
1486                            }
1487                            elements.push(FieldValue::owned_any(elem));
1488                        }
1489                        Ok(Some(FieldValue::list(elements)))
1490                    })
1491                },
1492            );
1493            if let Some(desc) = description {
1494                array_field = array_field.description(desc);
1495            }
1496            c.record_fields.push(array_field);
1497
1498            // Wrap the element-level IncludesFilter in an ArrayFilter so the
1499            // parser can find the `includes` key (compiler/filter.rs expects
1500            // `{ includes: [{ Name: { is: "..." } }] }`).
1501            let wrapper_filter_name = format!("{}_{full_path}_ArrayFilter", c.cube_name);
1502            let wrapper_filter = InputObject::new(&wrapper_filter_name)
1503                .description(format!("Array filter for {} — use `includes` to match elements", graphql_name))
1504                .field(InputValue::new("includes", TypeRef::named_list(&includes_filter_name))
1505                    .description("Match rows where at least one array element satisfies all conditions"));
1506            c.extra_inputs.push(wrapper_filter);
1507
1508            c.filter_fields.push(InputValue::new(
1509                graphql_name,
1510                TypeRef::named(&wrapper_filter_name),
1511            ));
1512
1513            c.extra_objects.push(element_obj);
1514            c.extra_inputs.push(includes_filter);
1515        }
1516    }
1517}
1518
1519/// Resolve the GraphQL Union __typename from a discriminator column value.
1520///
1521/// Walks variants in order; the first whose `source_type_names` contains
1522/// `type_str` wins.  If none match, the last variant is used as fallback
1523/// (conventionally the "catch-all" variant such as JSON).
1524fn resolve_union_typename(type_str: &str, variants: &[crate::cube::definition::UnionVariant]) -> String {
1525    for v in variants {
1526        if v.source_type_names.iter().any(|s| s == type_str) {
1527            return v.type_name.clone();
1528        }
1529    }
1530    variants.last().map(|v| v.type_name.clone()).unwrap_or_default()
1531}
1532
1533fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
1534    match dt {
1535        DimType::String | DimType::Decimal => TypeRef::named(TypeRef::STRING),
1536        DimType::DateTime => TypeRef::named("DateTime"),
1537        DimType::Int => TypeRef::named(TypeRef::INT),
1538        DimType::Float => TypeRef::named(TypeRef::FLOAT),
1539        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
1540    }
1541}
1542
1543fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
1544    match dt {
1545        DimType::String => "StringFilter",
1546        DimType::Int => "IntFilter",
1547        DimType::Float => "FloatFilter",
1548        DimType::Decimal => "DecimalFilter",
1549        DimType::DateTime => "DateTimeFilter",
1550        DimType::Bool => "BoolFilter",
1551    }
1552}
1553
1554pub fn json_to_gql_value(v: serde_json::Value) -> Value {
1555    match v {
1556        serde_json::Value::Null => Value::Null,
1557        serde_json::Value::Bool(b) => Value::from(b),
1558        serde_json::Value::Number(n) => {
1559            if let Some(i) = n.as_i64() { Value::from(i) }
1560            else if let Some(f) = n.as_f64() { Value::from(f) }
1561            else { Value::from(n.to_string()) }
1562        }
1563        serde_json::Value::String(s) => Value::from(s),
1564        _ => Value::from(v.to_string()),
1565    }
1566}
1567
1568/// Build a JoinExpr from a JoinDef and target cube definition.
1569/// Inspects the sub-selection of the join field to determine which columns to SELECT.
1570fn build_join_expr(
1571    jd: &crate::cube::definition::JoinDef,
1572    target_cube: &CubeDefinition,
1573    sub_field: &async_graphql::SelectionField<'_>,
1574    network: &str,
1575    join_idx: usize,
1576) -> JoinExpr {
1577    let target_flat = target_cube.flat_dimensions();
1578    let target_table = target_cube.table_for_chain(network);
1579
1580    let mut requested_paths = HashSet::new();
1581    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
1582
1583    let mut selects: Vec<SelectExpr> = target_flat.iter()
1584        .filter(|(path, _)| requested_paths.contains(path))
1585        .map(|(_, dim)| SelectExpr::Column {
1586            column: dim.column.clone(),
1587            alias: None,
1588        })
1589        .collect();
1590
1591    if selects.is_empty() {
1592        selects = target_flat.iter()
1593            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
1594            .collect();
1595    }
1596
1597    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
1598
1599    let group_by = if is_aggregate {
1600        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
1601        for sel in &selects {
1602            if let SelectExpr::Column { column, .. } = sel {
1603                if !column.contains('(') && !gb.contains(column) {
1604                    gb.push(column.clone());
1605                }
1606            }
1607        }
1608        gb
1609    } else {
1610        vec![]
1611    };
1612
1613    JoinExpr {
1614        schema: target_cube.schema.clone(),
1615        table: target_table,
1616        alias: format!("_j{}", join_idx),
1617        conditions: jd.conditions.clone(),
1618        selects,
1619        group_by,
1620        use_final: target_cube.use_final,
1621        is_aggregate,
1622        target_cube: jd.target_cube.clone(),
1623        join_field: sub_field.name().to_string(),
1624        join_type: jd.join_type.clone(),
1625    }
1626}
1627
1628/// Convert a ClickHouse DateTime value to ISO 8601 format.
1629/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
1630fn json_to_gql_datetime(v: serde_json::Value) -> Value {
1631    match v {
1632        serde_json::Value::String(s) => {
1633            let iso = if s.contains('T') {
1634                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
1635            } else {
1636                let replaced = s.replacen(' ', "T", 1);
1637                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
1638            };
1639            Value::from(iso)
1640        }
1641        other => json_to_gql_value(other),
1642    }
1643}