Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Async function type that executes a compiled SQL query and returns rows.
16/// The service layer provides this — the library never touches a database directly.
17pub type QueryExecutor = Arc<
18    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20    > + Send + Sync,
21>;
22
23/// Configuration for supported networks (chains) and optional stats collection.
24pub struct SchemaConfig {
25    pub networks: Vec<String>,
26    pub root_query_name: String,
27    /// Optional callback invoked after each cube query with execution metadata.
28    /// Used by application layer for billing, observability, etc.
29    pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33    fn default() -> Self {
34        Self {
35            networks: vec!["sol", "eth", "bsc"]
36                .into_iter().map(String::from).collect(),
37            root_query_name: "ChainStream".to_string(),
38            stats_callback: None,
39        }
40    }
41}
42
43/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
44pub fn build_schema(
45    registry: CubeRegistry,
46    dialect: Arc<dyn SqlDialect>,
47    executor: QueryExecutor,
48    config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50    let mut builder = Schema::build("Query", None, None);
51
52    let mut network_enum = Enum::new("Network")
53        .description("Blockchain network to query");
54    for net in &config.networks {
55        network_enum = network_enum.item(EnumItem::new(net));
56    }
57    builder = builder.register(network_enum);
58    builder = builder.register(filter_types::build_limit_input());
59
60    builder = builder.register(
61        InputObject::new("LimitByInput")
62            .description("Limit results per group (similar to ClickHouse LIMIT BY)")
63            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
64                .description("Comma-separated dimension names to group by"))
65            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
66                .description("Maximum rows per group"))
67            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
68                .description("Rows to skip per group")),
69    );
70
71    builder = builder.register(
72        Enum::new("OrderDirection")
73            .description("Sort direction")
74            .item(EnumItem::new("ASC").description("Ascending"))
75            .item(EnumItem::new("DESC").description("Descending")),
76    );
77
78    for input in filter_types::build_filter_primitives() {
79        builder = builder.register(input);
80    }
81
82    // Cubes are top-level Query fields, each with a required `network` argument.
83    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
84    let mut query = Object::new("Query");
85
86    for cube in registry.cubes() {
87        let types = build_cube_types(cube);
88        for obj in types.objects { builder = builder.register(obj); }
89        for inp in types.inputs { builder = builder.register(inp); }
90        for en in types.enums { builder = builder.register(en); }
91
92        let cube_name = cube.name.clone();
93        let dialect_clone = dialect.clone();
94        let executor_clone = executor.clone();
95        let stats_cb = config.stats_callback.clone();
96
97        let orderby_list_input_name = format!("{}OrderByInput", cube.name);
98
99        let cube_description = cube.description.clone();
100        let mut field = Field::new(
101            &cube.name,
102            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
103            move |ctx| {
104                let cube_name = cube_name.clone();
105                let dialect = dialect_clone.clone();
106                let executor = executor_clone.clone();
107                let stats_cb = stats_cb.clone();
108                FieldFuture::new(async move {
109                    let registry = ctx.ctx.data::<CubeRegistry>()?;
110                    let network_val = ctx.args.try_get("network")?;
111                    let network = network_val.enum_name()
112                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
113
114                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
115                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
116                    })?;
117
118                    let metric_requests = extract_metric_requests(&ctx, cube_def);
119                    let requested = extract_requested_fields(&ctx, cube_def);
120                    let mut ir = compiler::parser::parse_cube_query(
121                        cube_def,
122                        network,
123                        &ctx.args,
124                        &metric_requests,
125                        Some(requested),
126                    )?;
127
128                    // Resolve join fields from the selection set
129                    let mut join_idx = 0usize;
130                    for sub_field in ctx.ctx.field().selection_set() {
131                        let fname = sub_field.name().to_string();
132                        let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
133                        if let Some(jd) = join_def {
134                            if let Some(target_cube) = registry.get(&jd.target_cube) {
135                                let join_expr = build_join_expr(
136                                    jd, target_cube, &sub_field, network, join_idx,
137                                );
138                                ir.joins.push(join_expr);
139                                join_idx += 1;
140                            }
141                        }
142                    }
143
144                    let validated = compiler::validator::validate(ir)?;
145                    let result = dialect.compile(&validated);
146                    let sql = result.sql;
147                    let bindings = result.bindings;
148
149                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
150                        async_graphql::Error::new(format!("Query execution failed: {e}"))
151                    })?;
152
153                    // Remap aliased columns back to original names for resolvers
154                    let rows = if result.alias_remap.is_empty() {
155                        rows
156                    } else {
157                        rows.into_iter().map(|mut row| {
158                            for (alias, original) in &result.alias_remap {
159                                if let Some(val) = row.shift_remove(alias) {
160                                    row.entry(original.clone()).or_insert(val);
161                                }
162                            }
163                            row
164                        }).collect()
165                    };
166
167                    // Restructure join data: extract _jN.xxx columns into nested objects
168                    let rows: Vec<RowMap> = if validated.joins.is_empty() {
169                        rows
170                    } else {
171                        rows.into_iter().map(|mut row| {
172                            for join in &validated.joins {
173                                let prefix = format!("{}.", join.alias);
174                                let mut sub_row = RowMap::new();
175                                let keys: Vec<String> = row.keys()
176                                    .filter(|k| k.starts_with(&prefix))
177                                    .cloned()
178                                    .collect();
179                                for key in keys {
180                                    if let Some(val) = row.shift_remove(&key) {
181                                        sub_row.insert(key[prefix.len()..].to_string(), val);
182                                    }
183                                }
184                                let obj: serde_json::Map<String, serde_json::Value> =
185                                    sub_row.into_iter().collect();
186                                row.insert(
187                                    join.join_field.clone(),
188                                    serde_json::Value::Object(obj),
189                                );
190                            }
191                            row
192                        }).collect()
193                    };
194
195                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
196                        .or_else(|| stats_cb.clone());
197                    if let Some(cb) = effective_cb {
198                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
199                        cb(stats);
200                    }
201
202                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
203                    Ok(Some(FieldValue::list(values)))
204                })
205            },
206        );
207        if !cube_description.is_empty() {
208            field = field.description(&cube_description);
209        }
210        field = field
211            .argument(InputValue::new("network", TypeRef::named_nn("Network"))
212                .description("Blockchain network to query"))
213            .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
214                .description("Filter conditions"))
215            .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
216                .description("Pagination control"))
217            .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
218                .description("Per-group row limit"))
219            .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name)))
220                .description("Sort order (single column)"))
221            .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name))
222                .description("Sort order (multiple columns)"));
223
224        for sel in &cube.selectors {
225            let filter_type = dim_type_to_filter_name(&sel.dim_type);
226            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
227                .description(format!("Shorthand filter for {}", sel.graphql_name)));
228        }
229
230        query = query.field(field);
231    }
232
233    let metadata_registry = Arc::new(registry.clone());
234    let metadata_field = Field::new(
235        "_cubeMetadata",
236        TypeRef::named_nn(TypeRef::STRING),
237        move |_ctx| {
238            let reg = metadata_registry.clone();
239            FieldFuture::new(async move {
240                    let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
241                    serde_json::json!({
242                        "name": cube.name,
243                        "description": cube.description,
244                        "schema": cube.schema,
245                        "tablePattern": cube.table_pattern,
246                        "metrics": cube.metrics,
247                        "selectors": cube.selectors.iter().map(|s| {
248                            serde_json::json!({
249                                "name": s.graphql_name,
250                                "column": s.column,
251                                "type": format!("{:?}", s.dim_type),
252                            })
253                        }).collect::<Vec<_>>(),
254                        "dimensions": serialize_dims(&cube.dimensions),
255                        "joins": cube.joins.iter().map(|j| {
256                            serde_json::json!({
257                                "field": j.field_name,
258                                "target": j.target_cube,
259                            })
260                        }).collect::<Vec<_>>(),
261                        "defaultLimit": cube.default_limit,
262                        "maxLimit": cube.max_limit,
263                    })
264                }).collect();
265                let json = serde_json::to_string(&metadata).unwrap_or_default();
266                Ok(Some(FieldValue::value(Value::from(json))))
267            })
268        },
269    )
270    .description("Internal: returns JSON metadata about all cubes");
271    query = query.field(metadata_field);
272
273    builder = builder.register(query);
274    builder = builder.data(registry);
275
276    builder.finish()
277}
278
279fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
280    serde_json::Value::Array(dims.iter().map(|d| match d {
281        DimensionNode::Leaf(dim) => {
282            let mut obj = serde_json::json!({
283                "name": dim.graphql_name,
284                "column": dim.column,
285                "type": format!("{:?}", dim.dim_type),
286            });
287            if let Some(desc) = &dim.description {
288                obj["description"] = serde_json::Value::String(desc.clone());
289            }
290            obj
291        },
292        DimensionNode::Group { graphql_name, description, children } => {
293            let mut obj = serde_json::json!({
294                "name": graphql_name,
295                "children": serialize_dims(children),
296            });
297            if let Some(desc) = description {
298                obj["description"] = serde_json::Value::String(desc.clone());
299            }
300            obj
301        },
302    }).collect())
303}
304
305/// Extract metric requests from the GraphQL selection set by inspecting
306/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
307/// the "count" field in the selection set and extract its `of` argument.
308fn extract_metric_requests(
309    ctx: &async_graphql::dynamic::ResolverContext,
310    cube: &CubeDefinition,
311) -> Vec<compiler::parser::MetricRequest> {
312    let mut requests = Vec::new();
313
314    for sub_field in ctx.ctx.field().selection_set() {
315        let name = sub_field.name();
316        if !cube.metrics.contains(&name.to_string()) {
317            continue;
318        }
319
320        let args = match sub_field.arguments() {
321            Ok(args) => args,
322            Err(_) => continue,
323        };
324
325        let of_dimension = args
326            .iter()
327            .find(|(k, _)| k.as_str() == "of")
328            .and_then(|(_, v)| match v {
329                async_graphql::Value::Enum(e) => Some(e.to_string()),
330                async_graphql::Value::String(s) => Some(s.clone()),
331                _ => None,
332            })
333            .unwrap_or_else(|| "*".to_string());
334
335        let select_where_value = args
336            .iter()
337            .find(|(k, _)| k.as_str() == "selectWhere")
338            .map(|(_, v)| v.clone());
339
340        let condition_filter = args
341            .iter()
342            .find(|(k, _)| k.as_str() == "if")
343            .and_then(|(_, v)| {
344                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
345                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
346            });
347
348        requests.push(compiler::parser::MetricRequest {
349            function: name.to_string(),
350            of_dimension,
351            select_where_value,
352            condition_filter,
353        });
354    }
355
356    requests
357}
358
359fn extract_requested_fields(
360    ctx: &async_graphql::dynamic::ResolverContext,
361    cube: &CubeDefinition,
362) -> HashSet<String> {
363    let mut fields = HashSet::new();
364    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
365    fields
366}
367
368fn collect_selection_paths(
369    field: &async_graphql::SelectionField<'_>,
370    prefix: &str,
371    out: &mut HashSet<String>,
372    metrics: &[String],
373) {
374    for sub in field.selection_set() {
375        let name = sub.name();
376        if metrics.iter().any(|m| m == name) {
377            continue;
378        }
379        let path = if prefix.is_empty() {
380            name.to_string()
381        } else {
382            format!("{prefix}_{name}")
383        };
384        let has_children = sub.selection_set().next().is_some();
385        if has_children {
386            collect_selection_paths(&sub, &path, out, metrics);
387        } else {
388            out.insert(path);
389        }
390    }
391}
392
393// ---------------------------------------------------------------------------
394// Per-Cube GraphQL type generation
395// ---------------------------------------------------------------------------
396
397struct CubeTypes {
398    objects: Vec<Object>,
399    inputs: Vec<InputObject>,
400    enums: Vec<Enum>,
401}
402
403fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
404    let record_name = format!("{}Record", cube.name);
405    let filter_name = format!("{}Filter", cube.name);
406    let orderby_name = format!("{}OrderBy", cube.name);
407
408    let mut record_fields: Vec<Field> = Vec::new();
409    let mut filter_fields: Vec<InputValue> = Vec::new();
410    let mut orderby_items: Vec<String> = Vec::new();
411    let mut extra_objects: Vec<Object> = Vec::new();
412    let mut extra_inputs: Vec<InputObject> = Vec::new();
413
414    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
415        .description("OR combinator — matches if any sub-filter matches"));
416
417    {
418        let mut collector = DimCollector {
419            cube_name: &cube.name,
420            record_fields: &mut record_fields,
421            filter_fields: &mut filter_fields,
422            orderby_items: &mut orderby_items,
423            extra_objects: &mut extra_objects,
424            extra_inputs: &mut extra_inputs,
425        };
426        for node in &cube.dimensions {
427            collect_dimension_types(node, "", &mut collector);
428        }
429    }
430
431    let flat_dims = cube.flat_dimensions();
432    let mut metric_enums: Vec<Enum> = Vec::new();
433    let metric_descriptions: std::collections::HashMap<&str, &str> = [
434        ("count", "Count of rows or distinct values"),
435        ("sum", "Sum of values"),
436        ("avg", "Average of values"),
437        ("min", "Minimum value"),
438        ("max", "Maximum value"),
439        ("uniq", "Count of unique (distinct) values"),
440    ].into_iter().collect();
441
442    for metric in &cube.metrics {
443        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
444        extra_inputs.push(
445            InputObject::new(&select_where_name)
446                .description(format!("Post-aggregation filter for {} (HAVING clause)", metric))
447                .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
448                .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
449                .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
450                .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
451                .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
452        );
453
454        let of_enum_name = format!("{}_{}_Of", cube.name, metric);
455        let mut of_enum = Enum::new(&of_enum_name)
456            .description(format!("Dimension to apply {} aggregation on", metric));
457        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
458        metric_enums.push(of_enum);
459
460        let metric_clone = metric.clone();
461        let metric_desc = metric_descriptions.get(metric.as_str())
462            .copied()
463            .unwrap_or("Aggregate metric");
464        let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
465            let metric_key = metric_clone.clone();
466            FieldFuture::new(async move {
467                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
468                let key = format!("__{metric_key}");
469                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
470                Ok(Some(FieldValue::value(json_to_gql_value(val))))
471            })
472        })
473        .description(metric_desc)
474        .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
475            .description("Dimension to aggregate on (default: all rows)"))
476        .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
477            .description("Post-aggregation filter (HAVING)"))
478        .argument(InputValue::new("if", TypeRef::named(&filter_name))
479            .description("Conditional filter for this metric"));
480
481        record_fields.push(metric_field);
482    }
483
484    // Add join fields: joinXxx returns the target cube's Record type
485    for jd in &cube.joins {
486        let target_record_name = format!("{}Record", jd.target_cube);
487        let field_name_owned = jd.field_name.clone();
488        let mut join_field = Field::new(
489            &jd.field_name,
490            TypeRef::named(&target_record_name),
491            move |ctx| {
492                let field_name = field_name_owned.clone();
493                FieldFuture::new(async move {
494                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
495                    if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
496                        let sub_row: RowMap = obj.iter()
497                            .map(|(k, v)| (k.clone(), v.clone()))
498                            .collect();
499                        Ok(Some(FieldValue::owned_any(sub_row)))
500                    } else {
501                        Ok(Some(FieldValue::value(Value::Null)))
502                    }
503                })
504            },
505        );
506        if let Some(desc) = &jd.description {
507            join_field = join_field.description(desc);
508        }
509        record_fields.push(join_field);
510    }
511
512    let mut record = Object::new(&record_name);
513    for f in record_fields { record = record.field(f); }
514
515    let mut filter = InputObject::new(&filter_name)
516        .description(format!("Filter conditions for {} query", cube.name));
517    for f in filter_fields { filter = filter.field(f); }
518
519    let mut orderby = Enum::new(&orderby_name)
520        .description(format!("Sort order for {} results (single column)", cube.name));
521    for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
522
523    // Multi-column orderBy: {Cube}OrderBy_Field enum + {Cube}OrderByInput
524    let field_enum_name = format!("{}_Field", orderby_name);
525    let orderby_input_name = format!("{}OrderByInput", cube.name);
526    let mut field_enum = Enum::new(&field_enum_name)
527        .description(format!("Available fields for {} multi-column sort", cube.name));
528    let flat_dims = cube.flat_dimensions();
529    for (path, _) in &flat_dims {
530        field_enum = field_enum.item(EnumItem::new(path));
531    }
532    let orderby_input = InputObject::new(&orderby_input_name)
533        .description(format!("Multi-column sort input for {}", cube.name))
534        .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name))
535            .description("Field to sort by"))
536        .field(InputValue::new("direction", TypeRef::named("OrderDirection"))
537            .description("Sort direction (ASC or DESC)"));
538
539    let mut objects = vec![record]; objects.extend(extra_objects);
540    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
541    let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
542
543    CubeTypes { objects, inputs, enums }
544}
545
546struct DimCollector<'a> {
547    cube_name: &'a str,
548    record_fields: &'a mut Vec<Field>,
549    filter_fields: &'a mut Vec<InputValue>,
550    orderby_items: &'a mut Vec<String>,
551    extra_objects: &'a mut Vec<Object>,
552    extra_inputs: &'a mut Vec<InputObject>,
553}
554
555fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
556    match node {
557        DimensionNode::Leaf(dim) => {
558            let col = dim.column.clone();
559            let is_datetime = dim.dim_type == DimType::DateTime;
560            let mut leaf_field = Field::new(
561                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
562                move |ctx| {
563                    let col = col.clone();
564                    FieldFuture::new(async move {
565                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
566                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
567                        let gql_val = if is_datetime {
568                            json_to_gql_datetime(val)
569                        } else {
570                            json_to_gql_value(val)
571                        };
572                        Ok(Some(FieldValue::value(gql_val)))
573                    })
574                },
575            );
576            if let Some(desc) = &dim.description {
577                leaf_field = leaf_field.description(desc);
578            }
579            c.record_fields.push(leaf_field);
580            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
581
582            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
583            c.orderby_items.push(format!("{path}_ASC"));
584            c.orderby_items.push(format!("{path}_DESC"));
585        }
586        DimensionNode::Group { graphql_name, description, children } => {
587            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
588            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
589            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
590
591            let mut child_record_fields: Vec<Field> = Vec::new();
592            let mut child_filter_fields: Vec<InputValue> = Vec::new();
593            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
594
595            let mut child_collector = DimCollector {
596                cube_name: c.cube_name,
597                record_fields: &mut child_record_fields,
598                filter_fields: &mut child_filter_fields,
599                orderby_items: c.orderby_items,
600                extra_objects: c.extra_objects,
601                extra_inputs: c.extra_inputs,
602            };
603            for child in children {
604                collect_dimension_types(child, &new_prefix, &mut child_collector);
605            }
606
607            let mut nested_record = Object::new(&nested_record_name);
608            for f in child_record_fields { nested_record = nested_record.field(f); }
609
610            let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
611            let mut nested_filter = InputObject::new(&nested_filter_name)
612                .description(nested_filter_desc);
613            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
614
615            let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
616                FieldFuture::new(async move {
617                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
618                    Ok(Some(FieldValue::owned_any(row.clone())))
619                })
620            });
621            if let Some(desc) = description {
622                group_field = group_field.description(desc);
623            }
624            c.record_fields.push(group_field);
625            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
626            c.extra_objects.push(nested_record);
627            c.extra_inputs.push(nested_filter);
628        }
629    }
630}
631
632fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
633    match dt {
634        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
635        DimType::Int => TypeRef::named(TypeRef::INT),
636        DimType::Float => TypeRef::named(TypeRef::FLOAT),
637        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
638    }
639}
640
641fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
642    match dt {
643        DimType::String => "StringFilter",
644        DimType::Int => "IntFilter",
645        DimType::Float => "FloatFilter",
646        DimType::DateTime => "DateTimeFilter",
647        DimType::Bool => "BoolFilter",
648    }
649}
650
651pub fn json_to_gql_value(v: serde_json::Value) -> Value {
652    match v {
653        serde_json::Value::Null => Value::Null,
654        serde_json::Value::Bool(b) => Value::from(b),
655        serde_json::Value::Number(n) => {
656            if let Some(i) = n.as_i64() { Value::from(i) }
657            else if let Some(f) = n.as_f64() { Value::from(f) }
658            else { Value::from(n.to_string()) }
659        }
660        serde_json::Value::String(s) => Value::from(s),
661        _ => Value::from(v.to_string()),
662    }
663}
664
665/// Build a JoinExpr from a JoinDef and target cube definition.
666/// Inspects the sub-selection of the join field to determine which columns to SELECT.
667fn build_join_expr(
668    jd: &crate::cube::definition::JoinDef,
669    target_cube: &CubeDefinition,
670    sub_field: &async_graphql::SelectionField<'_>,
671    network: &str,
672    join_idx: usize,
673) -> JoinExpr {
674    let target_flat = target_cube.flat_dimensions();
675    let target_table = target_cube.table_for_chain(network);
676
677    let mut requested_paths = HashSet::new();
678    collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
679
680    let mut selects: Vec<SelectExpr> = target_flat.iter()
681        .filter(|(path, _)| requested_paths.contains(path))
682        .map(|(_, dim)| SelectExpr::Column {
683            column: dim.column.clone(),
684            alias: None,
685        })
686        .collect();
687
688    if selects.is_empty() {
689        selects = target_flat.iter()
690            .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
691            .collect();
692    }
693
694    let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
695
696    let group_by = if is_aggregate {
697        let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
698        for sel in &selects {
699            if let SelectExpr::Column { column, .. } = sel {
700                if !column.contains('(') && !gb.contains(column) {
701                    gb.push(column.clone());
702                }
703            }
704        }
705        gb
706    } else {
707        vec![]
708    };
709
710    JoinExpr {
711        schema: target_cube.schema.clone(),
712        table: target_table,
713        alias: format!("_j{}", join_idx),
714        conditions: jd.conditions.clone(),
715        selects,
716        group_by,
717        use_final: target_cube.use_final,
718        is_aggregate,
719        target_cube: jd.target_cube.clone(),
720        join_field: sub_field.name().to_string(),
721    }
722}
723
724/// Convert a ClickHouse DateTime value to ISO 8601 format.
725/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
726fn json_to_gql_datetime(v: serde_json::Value) -> Value {
727    match v {
728        serde_json::Value::String(s) => {
729            let iso = if s.contains('T') {
730                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
731            } else {
732                let replaced = s.replacen(' ', "T", 1);
733                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
734            };
735            Value::from(iso)
736        }
737        other => json_to_gql_value(other),
738    }
739}