Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Async function type that executes a compiled SQL query and returns rows.
16/// The service layer provides this — the library never touches a database directly.
17pub type QueryExecutor = Arc<
18    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20    > + Send + Sync,
21>;
22
23/// Configuration for supported networks (chains) and optional stats collection.
24pub struct SchemaConfig {
25    pub networks: Vec<String>,
26    pub root_query_name: String,
27    /// Optional callback invoked after each cube query with execution metadata.
28    /// Used by application layer for billing, observability, etc.
29    pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33    fn default() -> Self {
34        Self {
35            networks: vec!["sol", "eth", "bsc"]
36                .into_iter().map(String::from).collect(),
37            root_query_name: "ChainStream".to_string(),
38            stats_callback: None,
39        }
40    }
41}
42
43/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
44pub fn build_schema(
45    registry: CubeRegistry,
46    dialect: Arc<dyn SqlDialect>,
47    executor: QueryExecutor,
48    config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50    let mut builder = Schema::build("Query", None, None);
51
52    // Network enum
53    let mut network_enum = Enum::new("Network");
54    for net in &config.networks {
55        network_enum = network_enum.item(EnumItem::new(net));
56    }
57    builder = builder.register(network_enum);
58    builder = builder.register(filter_types::build_limit_input());
59
60    // Global OrderDirection enum for multi-column orderBy
61    builder = builder.register(
62        Enum::new("OrderDirection")
63            .item(EnumItem::new("ASC"))
64            .item(EnumItem::new("DESC")),
65    );
66
67    for input in filter_types::build_filter_primitives() {
68        builder = builder.register(input);
69    }
70
71    // Cubes are top-level Query fields, each with a required `network` argument.
72    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
73    let mut query = Object::new("Query");
74
75    for cube in registry.cubes() {
76        let types = build_cube_types(cube);
77        for obj in types.objects { builder = builder.register(obj); }
78        for inp in types.inputs { builder = builder.register(inp); }
79        for en in types.enums { builder = builder.register(en); }
80
81        let cube_name = cube.name.clone();
82        let dialect_clone = dialect.clone();
83        let executor_clone = executor.clone();
84        let stats_cb = config.stats_callback.clone();
85
86        let orderby_list_input_name = format!("{}OrderByInput", cube.name);
87
88        let mut field = Field::new(
89            &cube.name,
90            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
91            move |ctx| {
92                let cube_name = cube_name.clone();
93                let dialect = dialect_clone.clone();
94                let executor = executor_clone.clone();
95                let stats_cb = stats_cb.clone();
96                FieldFuture::new(async move {
97                    let registry = ctx.ctx.data::<CubeRegistry>()?;
98                    let network_val = ctx.args.try_get("network")?;
99                    let network = network_val.enum_name()
100                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
101
102                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
103                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
104                    })?;
105
106                    let metric_requests = extract_metric_requests(&ctx, cube_def);
107                    let requested = extract_requested_fields(&ctx, cube_def);
108                    let ir = compiler::parser::parse_cube_query(
109                        cube_def,
110                        network,
111                        &ctx.args,
112                        &metric_requests,
113                        Some(requested),
114                    )?;
115                    let validated = compiler::validator::validate(ir)?;
116                    let (sql, bindings) = dialect.compile(&validated);
117
118                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
119                        async_graphql::Error::new(format!("Query execution failed: {e}"))
120                    })?;
121
122                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
123                        .or_else(|| stats_cb.clone());
124                    if let Some(cb) = effective_cb {
125                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
126                        cb(stats);
127                    }
128
129                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
130                    Ok(Some(FieldValue::list(values)))
131                })
132            },
133        )
134        .argument(InputValue::new("network", TypeRef::named_nn("Network")))
135        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
136        .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
137        .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
138        .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
139
140        for sel in &cube.selectors {
141            let filter_type = dim_type_to_filter_name(&sel.dim_type);
142            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
143        }
144
145        query = query.field(field);
146    }
147
148    // Feature 3: _cubeMetadata introspection field
149    let metadata_registry = Arc::new(registry.clone());
150    let metadata_field = Field::new(
151        "_cubeMetadata",
152        TypeRef::named_nn(TypeRef::STRING),
153        move |_ctx| {
154            let reg = metadata_registry.clone();
155            FieldFuture::new(async move {
156                let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
157                    serde_json::json!({
158                        "name": cube.name,
159                        "schema": cube.schema,
160                        "tablePattern": cube.table_pattern,
161                        "metrics": cube.metrics,
162                        "selectors": cube.selectors.iter().map(|s| {
163                            serde_json::json!({
164                                "name": s.graphql_name,
165                                "column": s.column,
166                                "type": format!("{:?}", s.dim_type),
167                            })
168                        }).collect::<Vec<_>>(),
169                        "dimensions": serialize_dims(&cube.dimensions),
170                        "defaultLimit": cube.default_limit,
171                        "maxLimit": cube.max_limit,
172                    })
173                }).collect();
174                let json = serde_json::to_string(&metadata).unwrap_or_default();
175                Ok(Some(FieldValue::value(Value::from(json))))
176            })
177        },
178    );
179    query = query.field(metadata_field);
180
181    builder = builder.register(query);
182    builder = builder.data(registry);
183
184    builder.finish()
185}
186
187fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
188    serde_json::Value::Array(dims.iter().map(|d| match d {
189        DimensionNode::Leaf(dim) => serde_json::json!({
190            "name": dim.graphql_name,
191            "column": dim.column,
192            "type": format!("{:?}", dim.dim_type),
193        }),
194        DimensionNode::Group { graphql_name, children } => serde_json::json!({
195            "name": graphql_name,
196            "children": serialize_dims(children),
197        }),
198    }).collect())
199}
200
201/// Extract metric requests from the GraphQL selection set by inspecting
202/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
203/// the "count" field in the selection set and extract its `of` argument.
204fn extract_metric_requests(
205    ctx: &async_graphql::dynamic::ResolverContext,
206    cube: &CubeDefinition,
207) -> Vec<compiler::parser::MetricRequest> {
208    let mut requests = Vec::new();
209
210    for sub_field in ctx.ctx.field().selection_set() {
211        let name = sub_field.name();
212        if !cube.metrics.contains(&name.to_string()) {
213            continue;
214        }
215
216        let args = match sub_field.arguments() {
217            Ok(args) => args,
218            Err(_) => continue,
219        };
220
221        let of_dimension = args
222            .iter()
223            .find(|(k, _)| k.as_str() == "of")
224            .and_then(|(_, v)| match v {
225                async_graphql::Value::Enum(e) => Some(e.to_string()),
226                async_graphql::Value::String(s) => Some(s.clone()),
227                _ => None,
228            })
229            .unwrap_or_else(|| "*".to_string());
230
231        let select_where_value = args
232            .iter()
233            .find(|(k, _)| k.as_str() == "selectWhere")
234            .map(|(_, v)| v.clone());
235
236        let condition_filter = args
237            .iter()
238            .find(|(k, _)| k.as_str() == "if")
239            .and_then(|(_, v)| {
240                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
241                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
242            });
243
244        requests.push(compiler::parser::MetricRequest {
245            function: name.to_string(),
246            of_dimension,
247            select_where_value,
248            condition_filter,
249        });
250    }
251
252    requests
253}
254
255fn extract_requested_fields(
256    ctx: &async_graphql::dynamic::ResolverContext,
257    cube: &CubeDefinition,
258) -> HashSet<String> {
259    let mut fields = HashSet::new();
260    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
261    fields
262}
263
264fn collect_selection_paths(
265    field: &async_graphql::SelectionField<'_>,
266    prefix: &str,
267    out: &mut HashSet<String>,
268    metrics: &[String],
269) {
270    for sub in field.selection_set() {
271        let name = sub.name();
272        if metrics.iter().any(|m| m == name) {
273            continue;
274        }
275        let path = if prefix.is_empty() {
276            name.to_string()
277        } else {
278            format!("{prefix}_{name}")
279        };
280        let has_children = sub.selection_set().next().is_some();
281        if has_children {
282            collect_selection_paths(&sub, &path, out, metrics);
283        } else {
284            out.insert(path);
285        }
286    }
287}
288
289// ---------------------------------------------------------------------------
290// Per-Cube GraphQL type generation
291// ---------------------------------------------------------------------------
292
293struct CubeTypes {
294    objects: Vec<Object>,
295    inputs: Vec<InputObject>,
296    enums: Vec<Enum>,
297}
298
299fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
300    let record_name = format!("{}Record", cube.name);
301    let filter_name = format!("{}Filter", cube.name);
302    let orderby_name = format!("{}OrderBy", cube.name);
303
304    let mut record_fields: Vec<Field> = Vec::new();
305    let mut filter_fields: Vec<InputValue> = Vec::new();
306    let mut orderby_items: Vec<String> = Vec::new();
307    let mut extra_objects: Vec<Object> = Vec::new();
308    let mut extra_inputs: Vec<InputObject> = Vec::new();
309
310    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
311
312    {
313        let mut collector = DimCollector {
314            cube_name: &cube.name,
315            record_fields: &mut record_fields,
316            filter_fields: &mut filter_fields,
317            orderby_items: &mut orderby_items,
318            extra_objects: &mut extra_objects,
319            extra_inputs: &mut extra_inputs,
320        };
321        for node in &cube.dimensions {
322            collect_dimension_types(node, "", &mut collector);
323        }
324    }
325
326    let flat_dims = cube.flat_dimensions();
327    let mut metric_enums: Vec<Enum> = Vec::new();
328    for metric in &cube.metrics {
329        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
330        extra_inputs.push(
331            InputObject::new(&select_where_name)
332                .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
333                .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
334                .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
335                .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
336                .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
337        );
338
339        let of_enum_name = format!("{}_{}_Of", cube.name, metric);
340        let mut of_enum = Enum::new(&of_enum_name);
341        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
342        metric_enums.push(of_enum);
343
344        let metric_clone = metric.clone();
345        let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
346            let metric_key = metric_clone.clone();
347            FieldFuture::new(async move {
348                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
349                let key = format!("__{metric_key}");
350                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
351                Ok(Some(FieldValue::value(json_to_gql_value(val))))
352            })
353        })
354        .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
355        .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
356        .argument(InputValue::new("if", TypeRef::named(&filter_name)));
357
358        record_fields.push(metric_field);
359    }
360
361    let mut record = Object::new(&record_name);
362    for f in record_fields { record = record.field(f); }
363
364    let mut filter = InputObject::new(&filter_name);
365    for f in filter_fields { filter = filter.field(f); }
366
367    let mut orderby = Enum::new(&orderby_name);
368    for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
369
370    // Multi-column orderBy: {Cube}OrderBy_Field enum + {Cube}OrderByInput
371    let field_enum_name = format!("{}_Field", orderby_name);
372    let orderby_input_name = format!("{}OrderByInput", cube.name);
373    let mut field_enum = Enum::new(&field_enum_name);
374    let flat_dims = cube.flat_dimensions();
375    for (path, _) in &flat_dims {
376        field_enum = field_enum.item(EnumItem::new(path));
377    }
378    let orderby_input = InputObject::new(&orderby_input_name)
379        .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
380        .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
381
382    let mut objects = vec![record]; objects.extend(extra_objects);
383    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
384    let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
385
386    CubeTypes { objects, inputs, enums }
387}
388
389struct DimCollector<'a> {
390    cube_name: &'a str,
391    record_fields: &'a mut Vec<Field>,
392    filter_fields: &'a mut Vec<InputValue>,
393    orderby_items: &'a mut Vec<String>,
394    extra_objects: &'a mut Vec<Object>,
395    extra_inputs: &'a mut Vec<InputObject>,
396}
397
398fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
399    match node {
400        DimensionNode::Leaf(dim) => {
401            let col = dim.column.clone();
402            let is_datetime = dim.dim_type == DimType::DateTime;
403            let leaf_field = Field::new(
404                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
405                move |ctx| {
406                    let col = col.clone();
407                    FieldFuture::new(async move {
408                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
409                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
410                        let gql_val = if is_datetime {
411                            json_to_gql_datetime(val)
412                        } else {
413                            json_to_gql_value(val)
414                        };
415                        Ok(Some(FieldValue::value(gql_val)))
416                    })
417                },
418            );
419            c.record_fields.push(leaf_field);
420            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
421
422            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
423            c.orderby_items.push(format!("{path}_ASC"));
424            c.orderby_items.push(format!("{path}_DESC"));
425        }
426        DimensionNode::Group { graphql_name, children } => {
427            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
428            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
429            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
430
431            let mut child_record_fields: Vec<Field> = Vec::new();
432            let mut child_filter_fields: Vec<InputValue> = Vec::new();
433            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
434
435            let mut child_collector = DimCollector {
436                cube_name: c.cube_name,
437                record_fields: &mut child_record_fields,
438                filter_fields: &mut child_filter_fields,
439                orderby_items: c.orderby_items,
440                extra_objects: c.extra_objects,
441                extra_inputs: c.extra_inputs,
442            };
443            for child in children {
444                collect_dimension_types(child, &new_prefix, &mut child_collector);
445            }
446
447            let mut nested_record = Object::new(&nested_record_name);
448            for f in child_record_fields { nested_record = nested_record.field(f); }
449
450            let mut nested_filter = InputObject::new(&nested_filter_name);
451            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
452
453            let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
454                FieldFuture::new(async move {
455                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
456                    Ok(Some(FieldValue::owned_any(row.clone())))
457                })
458            });
459            c.record_fields.push(group_field);
460            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
461            c.extra_objects.push(nested_record);
462            c.extra_inputs.push(nested_filter);
463        }
464    }
465}
466
467fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
468    match dt {
469        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
470        DimType::Int => TypeRef::named(TypeRef::INT),
471        DimType::Float => TypeRef::named(TypeRef::FLOAT),
472        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
473    }
474}
475
476fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
477    match dt {
478        DimType::String => "StringFilter",
479        DimType::Int => "IntFilter",
480        DimType::Float => "FloatFilter",
481        DimType::DateTime => "DateTimeFilter",
482        DimType::Bool => "BoolFilter",
483    }
484}
485
486pub fn json_to_gql_value(v: serde_json::Value) -> Value {
487    match v {
488        serde_json::Value::Null => Value::Null,
489        serde_json::Value::Bool(b) => Value::from(b),
490        serde_json::Value::Number(n) => {
491            if let Some(i) = n.as_i64() { Value::from(i) }
492            else if let Some(f) = n.as_f64() { Value::from(f) }
493            else { Value::from(n.to_string()) }
494        }
495        serde_json::Value::String(s) => Value::from(s),
496        _ => Value::from(v.to_string()),
497    }
498}
499
500/// Convert a ClickHouse DateTime value to ISO 8601 format.
501/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
502fn json_to_gql_datetime(v: serde_json::Value) -> Value {
503    match v {
504        serde_json::Value::String(s) => {
505            let iso = if s.contains('T') {
506                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
507            } else {
508                let replaced = s.replacen(' ', "T", 1);
509                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
510            };
511            Value::from(iso)
512        }
513        other => json_to_gql_value(other),
514    }
515}