Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Async function type that executes a compiled SQL query and returns rows.
16/// The service layer provides this — the library never touches a database directly.
17pub type QueryExecutor = Arc<
18    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20    > + Send + Sync,
21>;
22
23/// Configuration for supported networks (chains) and optional stats collection.
24pub struct SchemaConfig {
25    pub networks: Vec<String>,
26    pub root_query_name: String,
27    /// Optional callback invoked after each cube query with execution metadata.
28    /// Used by application layer for billing, observability, etc.
29    pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33    fn default() -> Self {
34        Self {
35            networks: vec!["sol", "eth", "bsc"]
36                .into_iter().map(String::from).collect(),
37            root_query_name: "ChainStream".to_string(),
38            stats_callback: None,
39        }
40    }
41}
42
43/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
44pub fn build_schema(
45    registry: CubeRegistry,
46    dialect: Arc<dyn SqlDialect>,
47    executor: QueryExecutor,
48    config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50    let mut builder = Schema::build("Query", None, None);
51
52    let mut network_enum = Enum::new("Network")
53        .description("Blockchain network to query");
54    for net in &config.networks {
55        network_enum = network_enum.item(EnumItem::new(net));
56    }
57    builder = builder.register(network_enum);
58    builder = builder.register(filter_types::build_limit_input());
59
60    builder = builder.register(
61        InputObject::new("LimitByInput")
62            .description("Limit results per group (similar to ClickHouse LIMIT BY)")
63            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
64                .description("Comma-separated dimension names to group by"))
65            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
66                .description("Maximum rows per group"))
67            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
68                .description("Rows to skip per group")),
69    );
70
71    builder = builder.register(
72        Enum::new("OrderDirection")
73            .description("Sort direction")
74            .item(EnumItem::new("ASC").description("Ascending"))
75            .item(EnumItem::new("DESC").description("Descending")),
76    );
77
78    for input in filter_types::build_filter_primitives() {
79        builder = builder.register(input);
80    }
81
82    // Cubes are top-level Query fields, each with a required `network` argument.
83    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
84    let mut query = Object::new("Query");
85
86    for cube in registry.cubes() {
87        let types = build_cube_types(cube);
88        for obj in types.objects { builder = builder.register(obj); }
89        for inp in types.inputs { builder = builder.register(inp); }
90        for en in types.enums { builder = builder.register(en); }
91
92        let cube_name = cube.name.clone();
93        let dialect_clone = dialect.clone();
94        let executor_clone = executor.clone();
95        let stats_cb = config.stats_callback.clone();
96
97        let orderby_list_input_name = format!("{}OrderByInput", cube.name);
98
99        let cube_description = cube.description.clone();
100        let mut field = Field::new(
101            &cube.name,
102            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
103            move |ctx| {
104                let cube_name = cube_name.clone();
105                let dialect = dialect_clone.clone();
106                let executor = executor_clone.clone();
107                let stats_cb = stats_cb.clone();
108                FieldFuture::new(async move {
109                    let registry = ctx.ctx.data::<CubeRegistry>()?;
110                    let network_val = ctx.args.try_get("network")?;
111                    let network = network_val.enum_name()
112                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
113
114                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
115                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
116                    })?;
117
118                    let metric_requests = extract_metric_requests(&ctx, cube_def);
119                    let requested = extract_requested_fields(&ctx, cube_def);
120                    let mut ir = compiler::parser::parse_cube_query(
121                        cube_def,
122                        network,
123                        &ctx.args,
124                        &metric_requests,
125                        Some(requested),
126                    )?;
127
128                    // Resolve join fields from the selection set
129                    let mut join_idx = 0usize;
130                    for sub_field in ctx.ctx.field().selection_set() {
131                        let fname = sub_field.name().to_string();
132                        let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
133                        if let Some(jd) = join_def {
134                            if let Some(target_cube) = registry.get(&jd.target_cube) {
135                                let join_expr = build_join_expr(
136                                    jd, target_cube, &sub_field, network, join_idx,
137                                );
138                                ir.joins.push(join_expr);
139                                join_idx += 1;
140                            }
141                        }
142                    }
143
144                    let validated = compiler::validator::validate(ir)?;
145                    let result = dialect.compile(&validated);
146                    let sql = result.sql;
147                    let bindings = result.bindings;
148
149                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
150                        async_graphql::Error::new(format!("Query execution failed: {e}"))
151                    })?;
152
153                    // Remap aliased columns back to original names for resolvers
154                    let rows = if result.alias_remap.is_empty() {
155                        rows
156                    } else {
157                        rows.into_iter().map(|mut row| {
158                            for (alias, original) in &result.alias_remap {
159                                if let Some(val) = row.shift_remove(alias) {
160                                    row.entry(original.clone()).or_insert(val);
161                                }
162                            }
163                            row
164                        }).collect()
165                    };
166
167                    // Restructure join data: extract _jN.xxx columns into nested objects
168                    let rows: Vec<RowMap> = if validated.joins.is_empty() {
169                        rows
170                    } else {
171                        rows.into_iter().map(|mut row| {
172                            for join in &validated.joins {
173                                let prefix = format!("{}.", join.alias);
174                                let mut sub_row = RowMap::new();
175                                let keys: Vec<String> = row.keys()
176                                    .filter(|k| k.starts_with(&prefix))
177                                    .cloned()
178                                    .collect();
179                                for key in keys {
180                                    if let Some(val) = row.shift_remove(&key) {
181                                        sub_row.insert(key[prefix.len()..].to_string(), val);
182                                    }
183                                }
184                                let obj: serde_json::Map<String, serde_json::Value> =
185                                    sub_row.into_iter().collect();
186                                row.insert(
187                                    join.join_field.clone(),
188                                    serde_json::Value::Object(obj),
189                                );
190                            }
191                            row
192                        }).collect()
193                    };
194
195                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
196                        .or_else(|| stats_cb.clone());
197                    if let Some(cb) = effective_cb {
198                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
199                        cb(stats);
200                    }
201
202                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
203                    Ok(Some(FieldValue::list(values)))
204                })
205            },
206        );
207        if !cube_description.is_empty() {
208            field = field.description(&cube_description);
209        }
210        field = field
211            .argument(InputValue::new("network", TypeRef::named_nn("Network"))
212                .description("Blockchain network to query"))
213            .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
214                .description("Filter conditions"))
215            .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
216                .description("Pagination control"))
217            .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
218                .description("Per-group row limit"))
219            .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name)))
220                .description("Sort order (single column)"))
221            .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name))
222                .description("Sort order (multiple columns)"));
223
224        for sel in &cube.selectors {
225            let filter_type = dim_type_to_filter_name(&sel.dim_type);
226            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
227                .description(format!("Shorthand filter for {}", sel.graphql_name)));
228        }
229
230        query = query.field(field);
231    }
232
233    let metadata_registry = Arc::new(registry.clone());
234    let metadata_field = Field::new(
235        "_cubeMetadata",
236        TypeRef::named_nn(TypeRef::STRING),
237        move |_ctx| {
238            let reg = metadata_registry.clone();
239            FieldFuture::new(async move {
240                    let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
241                    serde_json::json!({
242                        "name": cube.name,
243                        "description": cube.description,
244                        "schema": cube.schema,
245                        "tablePattern": cube.table_pattern,
246                        "metrics": cube.metrics.iter().map(|m| {
247                            let mut obj = serde_json::json!({
248                                "name": m.name,
249                                "returnType": format!("{:?}", m.return_type),
250                            });
251                            if let Some(ref tmpl) = m.expression_template {
252                                obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
253                            }
254                            if let Some(ref desc) = m.description {
255                                obj["description"] = serde_json::Value::String(desc.clone());
256                            }
257                            obj
258                        }).collect::<Vec<_>>(),
259                        "selectors": cube.selectors.iter().map(|s| {
260                            serde_json::json!({
261                                "name": s.graphql_name,
262                                "column": s.column,
263                                "type": format!("{:?}", s.dim_type),
264                            })
265                        }).collect::<Vec<_>>(),
266                        "dimensions": serialize_dims(&cube.dimensions),
267                        "joins": cube.joins.iter().map(|j| {
268                            serde_json::json!({
269                                "field": j.field_name,
270                                "target": j.target_cube,
271                                "joinType": format!("{:?}", j.join_type),
272                            })
273                        }).collect::<Vec<_>>(),
274                        "tableRoutes": cube.table_routes.iter().map(|r| {
275                            serde_json::json!({
276                                "schema": r.schema,
277                                "tablePattern": r.table_pattern,
278                                "availableColumns": r.available_columns,
279                                "priority": r.priority,
280                            })
281                        }).collect::<Vec<_>>(),
282                        "defaultLimit": cube.default_limit,
283                        "maxLimit": cube.max_limit,
284                    })
285                }).collect();
286                let json = serde_json::to_string(&metadata).unwrap_or_default();
287                Ok(Some(FieldValue::value(Value::from(json))))
288            })
289        },
290    )
291    .description("Internal: returns JSON metadata about all cubes");
292    query = query.field(metadata_field);
293
294    builder = builder.register(query);
295    builder = builder.data(registry);
296
297    builder.finish()
298}
299
300fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
301    serde_json::Value::Array(dims.iter().map(|d| match d {
302        DimensionNode::Leaf(dim) => {
303            let mut obj = serde_json::json!({
304                "name": dim.graphql_name,
305                "column": dim.column,
306                "type": format!("{:?}", dim.dim_type),
307            });
308            if let Some(desc) = &dim.description {
309                obj["description"] = serde_json::Value::String(desc.clone());
310            }
311            obj
312        },
313        DimensionNode::Group { graphql_name, description, children } => {
314            let mut obj = serde_json::json!({
315                "name": graphql_name,
316                "children": serialize_dims(children),
317            });
318            if let Some(desc) = description {
319                obj["description"] = serde_json::Value::String(desc.clone());
320            }
321            obj
322        },
323    }).collect())
324}
325
326/// Extract metric requests from the GraphQL selection set by inspecting
327/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
328/// the "count" field in the selection set and extract its `of` argument.
329fn extract_metric_requests(
330    ctx: &async_graphql::dynamic::ResolverContext,
331    cube: &CubeDefinition,
332) -> Vec<compiler::parser::MetricRequest> {
333    let mut requests = Vec::new();
334
335    for sub_field in ctx.ctx.field().selection_set() {
336        let name = sub_field.name();
337        if !cube.has_metric(name) {
338            continue;
339        }
340
341        let args = match sub_field.arguments() {
342            Ok(args) => args,
343            Err(_) => continue,
344        };
345
346        let of_dimension = args
347            .iter()
348            .find(|(k, _)| k.as_str() == "of")
349            .and_then(|(_, v)| match v {
350                async_graphql::Value::Enum(e) => Some(e.to_string()),
351                async_graphql::Value::String(s) => Some(s.clone()),
352                _ => None,
353            })
354            .unwrap_or_else(|| "*".to_string());
355
356        let select_where_value = args
357            .iter()
358            .find(|(k, _)| k.as_str() == "selectWhere")
359            .map(|(_, v)| v.clone());
360
361        let condition_filter = args
362            .iter()
363            .find(|(k, _)| k.as_str() == "if")
364            .and_then(|(_, v)| {
365                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
366                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
367            });
368
369        requests.push(compiler::parser::MetricRequest {
370            function: name.to_string(),
371            of_dimension,
372            select_where_value,
373            condition_filter,
374        });
375    }
376
377    requests
378}
379
380fn extract_requested_fields(
381    ctx: &async_graphql::dynamic::ResolverContext,
382    cube: &CubeDefinition,
383) -> HashSet<String> {
384    let mut fields = HashSet::new();
385    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
386    fields
387}
388
389fn collect_selection_paths(
390    field: &async_graphql::SelectionField<'_>,
391    prefix: &str,
392    out: &mut HashSet<String>,
393    metrics: &[MetricDef],
394) {
395    for sub in field.selection_set() {
396        let name = sub.name();
397        if metrics.iter().any(|m| m.name == name) {
398            continue;
399        }
400        let path = if prefix.is_empty() {
401            name.to_string()
402        } else {
403            format!("{prefix}_{name}")
404        };
405        let has_children = sub.selection_set().next().is_some();
406        if has_children {
407            collect_selection_paths(&sub, &path, out, metrics);
408        } else {
409            out.insert(path);
410        }
411    }
412}
413
414// ---------------------------------------------------------------------------
415// Per-Cube GraphQL type generation
416// ---------------------------------------------------------------------------
417
418struct CubeTypes {
419    objects: Vec<Object>,
420    inputs: Vec<InputObject>,
421    enums: Vec<Enum>,
422}
423
424fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
425    let record_name = format!("{}Record", cube.name);
426    let filter_name = format!("{}Filter", cube.name);
427    let orderby_name = format!("{}OrderBy", cube.name);
428
429    let mut record_fields: Vec<Field> = Vec::new();
430    let mut filter_fields: Vec<InputValue> = Vec::new();
431    let mut orderby_items: Vec<String> = Vec::new();
432    let mut extra_objects: Vec<Object> = Vec::new();
433    let mut extra_inputs: Vec<InputObject> = Vec::new();
434
435    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
436        .description("OR combinator — matches if any sub-filter matches"));
437
438    {
439        let mut collector = DimCollector {
440            cube_name: &cube.name,
441            record_fields: &mut record_fields,
442            filter_fields: &mut filter_fields,
443            orderby_items: &mut orderby_items,
444            extra_objects: &mut extra_objects,
445            extra_inputs: &mut extra_inputs,
446        };
447        for node in &cube.dimensions {
448            collect_dimension_types(node, "", &mut collector);
449        }
450    }
451
452    let flat_dims = cube.flat_dimensions();
453    let mut metric_enums: Vec<Enum> = Vec::new();
454    let builtin_descs: std::collections::HashMap<&str, &str> = [
455        ("count", "Count of rows or distinct values"),
456        ("sum", "Sum of values"),
457        ("avg", "Average of values"),
458        ("min", "Minimum value"),
459        ("max", "Maximum value"),
460        ("uniq", "Count of unique (distinct) values"),
461    ].into_iter().collect();
462
463    for metric in &cube.metrics {
464        let metric_name = &metric.name;
465        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
466
467        if metric.supports_where {
468            extra_inputs.push(
469                InputObject::new(&select_where_name)
470                    .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
471                    .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
472                    .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
473                    .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
474                    .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
475                    .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
476            );
477        }
478
479        let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
480        let mut of_enum = Enum::new(&of_enum_name)
481            .description(format!("Dimension to apply {} aggregation on", metric_name));
482        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
483        metric_enums.push(of_enum);
484
485        let metric_name_clone = metric_name.clone();
486        let return_type_ref = dim_type_to_typeref(&metric.return_type);
487        let metric_desc = metric.description.as_deref()
488            .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
489            .unwrap_or("Aggregate metric");
490
491        let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
492            let metric_key = metric_name_clone.clone();
493            FieldFuture::new(async move {
494                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
495                let key = format!("__{metric_key}");
496                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
497                Ok(Some(FieldValue::value(json_to_gql_value(val))))
498            })
499        })
500        .description(metric_desc)
501        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
502            .description("Dimension to aggregate on (default: all rows)"));
503
504        if metric.supports_where {
505            metric_field = metric_field
506                .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
507                    .description("Post-aggregation filter (HAVING)"))
508                .argument(InputValue::new("if", TypeRef::named(&filter_name))
509                    .description("Conditional filter for this metric"));
510        }
511
512        record_fields.push(metric_field);
513    }
514
515    // Add join fields: joinXxx returns the target cube's Record type
516    for jd in &cube.joins {
517        let target_record_name = format!("{}Record", jd.target_cube);
518        let field_name_owned = jd.field_name.clone();
519        let mut join_field = Field::new(
520            &jd.field_name,
521            TypeRef::named(&target_record_name),
522            move |ctx| {
523                let field_name = field_name_owned.clone();
524                FieldFuture::new(async move {
525                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
526                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
527                        let sub_row: RowMap = obj.iter()
528                            .map(|(k, v)| (k.clone(), v.clone()))
529                            .collect();
530                        Ok(Some(FieldValue::owned_any(sub_row)))
531                    } else {
532                        Ok(Some(FieldValue::value(Value::Null)))
533                    }
534                })
535            },
536        );
537        if let Some(desc) = &jd.description {
538            join_field = join_field.description(desc);
539        }
540        record_fields.push(join_field);
541    }
542
543    let mut record = Object::new(&record_name);
544    for f in record_fields { record = record.field(f); }
545
546    let mut filter = InputObject::new(&filter_name)
547        .description(format!("Filter conditions for {} query", cube.name));
548    for f in filter_fields { filter = filter.field(f); }
549
550    let mut orderby = Enum::new(&orderby_name)
551        .description(format!("Sort order for {} results (single column)", cube.name));
552    for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
553
554    // Multi-column orderBy: {Cube}OrderBy_Field enum + {Cube}OrderByInput
555    let field_enum_name = format!("{}_Field", orderby_name);
556    let orderby_input_name = format!("{}OrderByInput", cube.name);
557    let mut field_enum = Enum::new(&field_enum_name)
558        .description(format!("Available fields for {} multi-column sort", cube.name));
559    let flat_dims = cube.flat_dimensions();
560    for (path, _) in &flat_dims {
561        field_enum = field_enum.item(EnumItem::new(path));
562    }
563    let orderby_input = InputObject::new(&orderby_input_name)
564        .description(format!("Multi-column sort input for {}", cube.name))
565        .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name))
566            .description("Field to sort by"))
567        .field(InputValue::new("direction", TypeRef::named("OrderDirection"))
568            .description("Sort direction (ASC or DESC)"));
569
570    let mut objects = vec![record]; objects.extend(extra_objects);
571    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
572    let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
573
574    CubeTypes { objects, inputs, enums }
575}
576
577struct DimCollector<'a> {
578    cube_name: &'a str,
579    record_fields: &'a mut Vec<Field>,
580    filter_fields: &'a mut Vec<InputValue>,
581    orderby_items: &'a mut Vec<String>,
582    extra_objects: &'a mut Vec<Object>,
583    extra_inputs: &'a mut Vec<InputObject>,
584}
585
586fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
587    match node {
588        DimensionNode::Leaf(dim) => {
589            let col = dim.column.clone();
590            let is_datetime = dim.dim_type == DimType::DateTime;
591            let mut leaf_field = Field::new(
592                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
593                move |ctx| {
594                    let col = col.clone();
595                    FieldFuture::new(async move {
596                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
597                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
598                        let gql_val = if is_datetime {
599                            json_to_gql_datetime(val)
600                        } else {
601                            json_to_gql_value(val)
602                        };
603                        Ok(Some(FieldValue::value(gql_val)))
604                    })
605                },
606            );
607            if let Some(desc) = &dim.description {
608                leaf_field = leaf_field.description(desc);
609            }
610            c.record_fields.push(leaf_field);
611            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
612
613            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
614            c.orderby_items.push(format!("{path}_ASC"));
615            c.orderby_items.push(format!("{path}_DESC"));
616        }
617        DimensionNode::Group { graphql_name, description, children } => {
618            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
619            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
620            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
621
622            let mut child_record_fields: Vec<Field> = Vec::new();
623            let mut child_filter_fields: Vec<InputValue> = Vec::new();
624            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
625
626            let mut child_collector = DimCollector {
627                cube_name: c.cube_name,
628                record_fields: &mut child_record_fields,
629                filter_fields: &mut child_filter_fields,
630                orderby_items: c.orderby_items,
631                extra_objects: c.extra_objects,
632                extra_inputs: c.extra_inputs,
633            };
634            for child in children {
635                collect_dimension_types(child, &new_prefix, &mut child_collector);
636            }
637
638            let mut nested_record = Object::new(&nested_record_name);
639            for f in child_record_fields { nested_record = nested_record.field(f); }
640
641            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
642            let mut nested_filter = InputObject::new(&nested_filter_name)
643                .description(nested_filter_desc);
644            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
645
646            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
647                FieldFuture::new(async move {
648                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
649                    Ok(Some(FieldValue::owned_any(row.clone())))
650                })
651            });
652            if let Some(desc) = description {
653                group_field = group_field.description(desc);
654            }
655            c.record_fields.push(group_field);
656            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
657            c.extra_objects.push(nested_record);
658            c.extra_inputs.push(nested_filter);
659        }
660    }
661}
662
663fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
664    match dt {
665        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
666        DimType::Int => TypeRef::named(TypeRef::INT),
667        DimType::Float => TypeRef::named(TypeRef::FLOAT),
668        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
669    }
670}
671
672fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
673    match dt {
674        DimType::String => "StringFilter",
675        DimType::Int => "IntFilter",
676        DimType::Float => "FloatFilter",
677        DimType::DateTime => "DateTimeFilter",
678        DimType::Bool => "BoolFilter",
679    }
680}
681
682pub fn json_to_gql_value(v: serde_json::Value) -> Value {
683    match v {
684        serde_json::Value::Null => Value::Null,
685        serde_json::Value::Bool(b) => Value::from(b),
686        serde_json::Value::Number(n) => {
687            if let Some(i) = n.as_i64() { Value::from(i) }
688            else if let Some(f) = n.as_f64() { Value::from(f) }
689            else { Value::from(n.to_string()) }
690        }
691        serde_json::Value::String(s) => Value::from(s),
692        _ => Value::from(v.to_string()),
693    }
694}
695
696/// Build a JoinExpr from a JoinDef and target cube definition.
697/// Inspects the sub-selection of the join field to determine which columns to SELECT.
698fn build_join_expr(
699    jd: &crate::cube::definition::JoinDef,
700    target_cube: &CubeDefinition,
701    sub_field: &async_graphql::SelectionField<'_>,
702    network: &str,
703    join_idx: usize,
704) -> JoinExpr {
705    let target_flat = target_cube.flat_dimensions();
706    let target_table = target_cube.table_for_chain(network);
707
708    let mut requested_paths = HashSet::new();
709    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
710
711    let mut selects: Vec<SelectExpr> = target_flat.iter()
712        .filter(|(path, _)| requested_paths.contains(path))
713        .map(|(_, dim)| SelectExpr::Column {
714            column: dim.column.clone(),
715            alias: None,
716        })
717        .collect();
718
719    if selects.is_empty() {
720        selects = target_flat.iter()
721            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
722            .collect();
723    }
724
725    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
726
727    let group_by = if is_aggregate {
728        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
729        for sel in &selects {
730            if let SelectExpr::Column { column, .. } = sel {
731                if !column.contains('(') && !gb.contains(column) {
732                    gb.push(column.clone());
733                }
734            }
735        }
736        gb
737    } else {
738        vec![]
739    };
740
741    JoinExpr {
742        schema: target_cube.schema.clone(),
743        table: target_table,
744        alias: format!("_j{}", join_idx),
745        conditions: jd.conditions.clone(),
746        selects,
747        group_by,
748        use_final: target_cube.use_final,
749        is_aggregate,
750        target_cube: jd.target_cube.clone(),
751        join_field: sub_field.name().to_string(),
752        join_type: jd.join_type.clone(),
753    }
754}
755
756/// Convert a ClickHouse DateTime value to ISO 8601 format.
757/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
758fn json_to_gql_datetime(v: serde_json::Value) -> Value {
759    match v {
760        serde_json::Value::String(s) => {
761            let iso = if s.contains('T') {
762                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
763            } else {
764                let replaced = s.replacen(' ', "T", 1);
765                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
766            };
767            Value::from(iso)
768        }
769        other => json_to_gql_value(other),
770    }
771}