Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Async function type that executes a compiled SQL query and returns rows.
16/// The service layer provides this — the library never touches a database directly.
17pub type QueryExecutor = Arc<
18    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20    > + Send + Sync,
21>;
22
23/// Configuration for supported networks (chains) and optional stats collection.
24pub struct SchemaConfig {
25    pub networks: Vec<String>,
26    pub root_query_name: String,
27    /// Optional callback invoked after each cube query with execution metadata.
28    /// Used by application layer for billing, observability, etc.
29    pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33    fn default() -> Self {
34        Self {
35            networks: vec!["sol", "eth", "bsc"]
36                .into_iter().map(String::from).collect(),
37            root_query_name: "ChainStream".to_string(),
38            stats_callback: None,
39        }
40    }
41}
42
43/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
44pub fn build_schema(
45    registry: CubeRegistry,
46    dialect: Arc<dyn SqlDialect>,
47    executor: QueryExecutor,
48    config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50    let mut builder = Schema::build("Query", None, None);
51
52    // Network enum
53    let mut network_enum = Enum::new("Network");
54    for net in &config.networks {
55        network_enum = network_enum.item(EnumItem::new(net));
56    }
57    builder = builder.register(network_enum);
58    builder = builder.register(filter_types::build_limit_input());
59
60    builder = builder.register(
61        InputObject::new("LimitByInput")
62            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING)))
63            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)))
64            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))),
65    );
66
67    // Global OrderDirection enum for multi-column orderBy
68    builder = builder.register(
69        Enum::new("OrderDirection")
70            .item(EnumItem::new("ASC"))
71            .item(EnumItem::new("DESC")),
72    );
73
74    for input in filter_types::build_filter_primitives() {
75        builder = builder.register(input);
76    }
77
78    // Cubes are top-level Query fields, each with a required `network` argument.
79    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
80    let mut query = Object::new("Query");
81
82    for cube in registry.cubes() {
83        let types = build_cube_types(cube);
84        for obj in types.objects { builder = builder.register(obj); }
85        for inp in types.inputs { builder = builder.register(inp); }
86        for en in types.enums { builder = builder.register(en); }
87
88        let cube_name = cube.name.clone();
89        let dialect_clone = dialect.clone();
90        let executor_clone = executor.clone();
91        let stats_cb = config.stats_callback.clone();
92
93        let orderby_list_input_name = format!("{}OrderByInput", cube.name);
94
95        let mut field = Field::new(
96            &cube.name,
97            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
98            move |ctx| {
99                let cube_name = cube_name.clone();
100                let dialect = dialect_clone.clone();
101                let executor = executor_clone.clone();
102                let stats_cb = stats_cb.clone();
103                FieldFuture::new(async move {
104                    let registry = ctx.ctx.data::<CubeRegistry>()?;
105                    let network_val = ctx.args.try_get("network")?;
106                    let network = network_val.enum_name()
107                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
108
109                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
110                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
111                    })?;
112
113                    let metric_requests = extract_metric_requests(&ctx, cube_def);
114                    let requested = extract_requested_fields(&ctx, cube_def);
115                    let ir = compiler::parser::parse_cube_query(
116                        cube_def,
117                        network,
118                        &ctx.args,
119                        &metric_requests,
120                        Some(requested),
121                    )?;
122                    let validated = compiler::validator::validate(ir)?;
123                    let (sql, bindings) = dialect.compile(&validated);
124
125                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
126                        async_graphql::Error::new(format!("Query execution failed: {e}"))
127                    })?;
128
129                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
130                        .or_else(|| stats_cb.clone());
131                    if let Some(cb) = effective_cb {
132                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
133                        cb(stats);
134                    }
135
136                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
137                    Ok(Some(FieldValue::list(values)))
138                })
139            },
140        );
141        field = field
142            .argument(InputValue::new("network", TypeRef::named_nn("Network")))
143            .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
144            .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
145            .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput")))
146            .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
147            .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
148
149        for sel in &cube.selectors {
150            let filter_type = dim_type_to_filter_name(&sel.dim_type);
151            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
152        }
153
154        query = query.field(field);
155    }
156
157    // Feature 3: _cubeMetadata introspection field
158    let metadata_registry = Arc::new(registry.clone());
159    let metadata_field = Field::new(
160        "_cubeMetadata",
161        TypeRef::named_nn(TypeRef::STRING),
162        move |_ctx| {
163            let reg = metadata_registry.clone();
164            FieldFuture::new(async move {
165                let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
166                    serde_json::json!({
167                        "name": cube.name,
168                        "schema": cube.schema,
169                        "tablePattern": cube.table_pattern,
170                        "metrics": cube.metrics,
171                        "selectors": cube.selectors.iter().map(|s| {
172                            serde_json::json!({
173                                "name": s.graphql_name,
174                                "column": s.column,
175                                "type": format!("{:?}", s.dim_type),
176                            })
177                        }).collect::<Vec<_>>(),
178                        "dimensions": serialize_dims(&cube.dimensions),
179                        "defaultLimit": cube.default_limit,
180                        "maxLimit": cube.max_limit,
181                    })
182                }).collect();
183                let json = serde_json::to_string(&metadata).unwrap_or_default();
184                Ok(Some(FieldValue::value(Value::from(json))))
185            })
186        },
187    );
188    query = query.field(metadata_field);
189
190    builder = builder.register(query);
191    builder = builder.data(registry);
192
193    builder.finish()
194}
195
196fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
197    serde_json::Value::Array(dims.iter().map(|d| match d {
198        DimensionNode::Leaf(dim) => serde_json::json!({
199            "name": dim.graphql_name,
200            "column": dim.column,
201            "type": format!("{:?}", dim.dim_type),
202        }),
203        DimensionNode::Group { graphql_name, children } => serde_json::json!({
204            "name": graphql_name,
205            "children": serialize_dims(children),
206        }),
207    }).collect())
208}
209
210/// Extract metric requests from the GraphQL selection set by inspecting
211/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
212/// the "count" field in the selection set and extract its `of` argument.
213fn extract_metric_requests(
214    ctx: &async_graphql::dynamic::ResolverContext,
215    cube: &CubeDefinition,
216) -> Vec<compiler::parser::MetricRequest> {
217    let mut requests = Vec::new();
218
219    for sub_field in ctx.ctx.field().selection_set() {
220        let name = sub_field.name();
221        if !cube.metrics.contains(&name.to_string()) {
222            continue;
223        }
224
225        let args = match sub_field.arguments() {
226            Ok(args) => args,
227            Err(_) => continue,
228        };
229
230        let of_dimension = args
231            .iter()
232            .find(|(k, _)| k.as_str() == "of")
233            .and_then(|(_, v)| match v {
234                async_graphql::Value::Enum(e) => Some(e.to_string()),
235                async_graphql::Value::String(s) => Some(s.clone()),
236                _ => None,
237            })
238            .unwrap_or_else(|| "*".to_string());
239
240        let select_where_value = args
241            .iter()
242            .find(|(k, _)| k.as_str() == "selectWhere")
243            .map(|(_, v)| v.clone());
244
245        let condition_filter = args
246            .iter()
247            .find(|(k, _)| k.as_str() == "if")
248            .and_then(|(_, v)| {
249                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
250                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
251            });
252
253        requests.push(compiler::parser::MetricRequest {
254            function: name.to_string(),
255            of_dimension,
256            select_where_value,
257            condition_filter,
258        });
259    }
260
261    requests
262}
263
264fn extract_requested_fields(
265    ctx: &async_graphql::dynamic::ResolverContext,
266    cube: &CubeDefinition,
267) -> HashSet<String> {
268    let mut fields = HashSet::new();
269    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
270    fields
271}
272
273fn collect_selection_paths(
274    field: &async_graphql::SelectionField<'_>,
275    prefix: &str,
276    out: &mut HashSet<String>,
277    metrics: &[String],
278) {
279    for sub in field.selection_set() {
280        let name = sub.name();
281        if metrics.iter().any(|m| m == name) {
282            continue;
283        }
284        let path = if prefix.is_empty() {
285            name.to_string()
286        } else {
287            format!("{prefix}_{name}")
288        };
289        let has_children = sub.selection_set().next().is_some();
290        if has_children {
291            collect_selection_paths(&sub, &path, out, metrics);
292        } else {
293            out.insert(path);
294        }
295    }
296}
297
298// ---------------------------------------------------------------------------
299// Per-Cube GraphQL type generation
300// ---------------------------------------------------------------------------
301
302struct CubeTypes {
303    objects: Vec<Object>,
304    inputs: Vec<InputObject>,
305    enums: Vec<Enum>,
306}
307
308fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
309    let record_name = format!("{}Record", cube.name);
310    let filter_name = format!("{}Filter", cube.name);
311    let orderby_name = format!("{}OrderBy", cube.name);
312
313    let mut record_fields: Vec<Field> = Vec::new();
314    let mut filter_fields: Vec<InputValue> = Vec::new();
315    let mut orderby_items: Vec<String> = Vec::new();
316    let mut extra_objects: Vec<Object> = Vec::new();
317    let mut extra_inputs: Vec<InputObject> = Vec::new();
318
319    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
320
321    {
322        let mut collector = DimCollector {
323            cube_name: &cube.name,
324            record_fields: &mut record_fields,
325            filter_fields: &mut filter_fields,
326            orderby_items: &mut orderby_items,
327            extra_objects: &mut extra_objects,
328            extra_inputs: &mut extra_inputs,
329        };
330        for node in &cube.dimensions {
331            collect_dimension_types(node, "", &mut collector);
332        }
333    }
334
335    let flat_dims = cube.flat_dimensions();
336    let mut metric_enums: Vec<Enum> = Vec::new();
337    for metric in &cube.metrics {
338        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
339        extra_inputs.push(
340            InputObject::new(&select_where_name)
341                .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
342                .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
343                .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
344                .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
345                .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
346        );
347
348        let of_enum_name = format!("{}_{}_Of", cube.name, metric);
349        let mut of_enum = Enum::new(&of_enum_name);
350        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
351        metric_enums.push(of_enum);
352
353        let metric_clone = metric.clone();
354        let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
355            let metric_key = metric_clone.clone();
356            FieldFuture::new(async move {
357                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
358                let key = format!("__{metric_key}");
359                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
360                Ok(Some(FieldValue::value(json_to_gql_value(val))))
361            })
362        })
363        .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
364        .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
365        .argument(InputValue::new("if", TypeRef::named(&filter_name)));
366
367        record_fields.push(metric_field);
368    }
369
370    let mut record = Object::new(&record_name);
371    for f in record_fields { record = record.field(f); }
372
373    let mut filter = InputObject::new(&filter_name);
374    for f in filter_fields { filter = filter.field(f); }
375
376    let mut orderby = Enum::new(&orderby_name);
377    for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
378
379    // Multi-column orderBy: {Cube}OrderBy_Field enum + {Cube}OrderByInput
380    let field_enum_name = format!("{}_Field", orderby_name);
381    let orderby_input_name = format!("{}OrderByInput", cube.name);
382    let mut field_enum = Enum::new(&field_enum_name);
383    let flat_dims = cube.flat_dimensions();
384    for (path, _) in &flat_dims {
385        field_enum = field_enum.item(EnumItem::new(path));
386    }
387    let orderby_input = InputObject::new(&orderby_input_name)
388        .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
389        .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
390
391    let mut objects = vec![record]; objects.extend(extra_objects);
392    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
393    let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
394
395    CubeTypes { objects, inputs, enums }
396}
397
398struct DimCollector<'a> {
399    cube_name: &'a str,
400    record_fields: &'a mut Vec<Field>,
401    filter_fields: &'a mut Vec<InputValue>,
402    orderby_items: &'a mut Vec<String>,
403    extra_objects: &'a mut Vec<Object>,
404    extra_inputs: &'a mut Vec<InputObject>,
405}
406
407fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
408    match node {
409        DimensionNode::Leaf(dim) => {
410            let col = dim.column.clone();
411            let is_datetime = dim.dim_type == DimType::DateTime;
412            let leaf_field = Field::new(
413                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
414                move |ctx| {
415                    let col = col.clone();
416                    FieldFuture::new(async move {
417                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
418                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
419                        let gql_val = if is_datetime {
420                            json_to_gql_datetime(val)
421                        } else {
422                            json_to_gql_value(val)
423                        };
424                        Ok(Some(FieldValue::value(gql_val)))
425                    })
426                },
427            );
428            c.record_fields.push(leaf_field);
429            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
430
431            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
432            c.orderby_items.push(format!("{path}_ASC"));
433            c.orderby_items.push(format!("{path}_DESC"));
434        }
435        DimensionNode::Group { graphql_name, children } => {
436            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
437            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
438            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
439
440            let mut child_record_fields: Vec<Field> = Vec::new();
441            let mut child_filter_fields: Vec<InputValue> = Vec::new();
442            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
443
444            let mut child_collector = DimCollector {
445                cube_name: c.cube_name,
446                record_fields: &mut child_record_fields,
447                filter_fields: &mut child_filter_fields,
448                orderby_items: c.orderby_items,
449                extra_objects: c.extra_objects,
450                extra_inputs: c.extra_inputs,
451            };
452            for child in children {
453                collect_dimension_types(child, &new_prefix, &mut child_collector);
454            }
455
456            let mut nested_record = Object::new(&nested_record_name);
457            for f in child_record_fields { nested_record = nested_record.field(f); }
458
459            let mut nested_filter = InputObject::new(&nested_filter_name);
460            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
461
462            let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
463                FieldFuture::new(async move {
464                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
465                    Ok(Some(FieldValue::owned_any(row.clone())))
466                })
467            });
468            c.record_fields.push(group_field);
469            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
470            c.extra_objects.push(nested_record);
471            c.extra_inputs.push(nested_filter);
472        }
473    }
474}
475
476fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
477    match dt {
478        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
479        DimType::Int => TypeRef::named(TypeRef::INT),
480        DimType::Float => TypeRef::named(TypeRef::FLOAT),
481        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
482    }
483}
484
485fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
486    match dt {
487        DimType::String => "StringFilter",
488        DimType::Int => "IntFilter",
489        DimType::Float => "FloatFilter",
490        DimType::DateTime => "DateTimeFilter",
491        DimType::Bool => "BoolFilter",
492    }
493}
494
495pub fn json_to_gql_value(v: serde_json::Value) -> Value {
496    match v {
497        serde_json::Value::Null => Value::Null,
498        serde_json::Value::Bool(b) => Value::from(b),
499        serde_json::Value::Number(n) => {
500            if let Some(i) = n.as_i64() { Value::from(i) }
501            else if let Some(f) = n.as_f64() { Value::from(f) }
502            else { Value::from(n.to_string()) }
503        }
504        serde_json::Value::String(s) => Value::from(s),
505        _ => Value::from(v.to_string()),
506    }
507}
508
509/// Convert a ClickHouse DateTime value to ISO 8601 format.
510/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
511fn json_to_gql_datetime(v: serde_json::Value) -> Value {
512    match v {
513        serde_json::Value::String(s) => {
514            let iso = if s.contains('T') {
515                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
516            } else {
517                let replaced = s.replacen(' ', "T", 1);
518                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
519            };
520            Value::from(iso)
521        }
522        other => json_to_gql_value(other),
523    }
524}