Skip to main content

activecube_rs/schema/
generator.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15/// Async function type that executes a compiled SQL query and returns rows.
16/// The service layer provides this — the library never touches a database directly.
17pub type QueryExecutor = Arc<
18    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20    > + Send + Sync,
21>;
22
23/// Configuration for supported networks (chains) and optional stats collection.
24pub struct SchemaConfig {
25    pub networks: Vec<String>,
26    pub root_query_name: String,
27    /// Optional callback invoked after each cube query with execution metadata.
28    /// Used by application layer for billing, observability, etc.
29    pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33    fn default() -> Self {
34        Self {
35            networks: vec!["sol", "eth", "bsc"]
36                .into_iter().map(String::from).collect(),
37            root_query_name: "ChainStream".to_string(),
38            stats_callback: None,
39        }
40    }
41}
42
43/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
44pub fn build_schema(
45    registry: CubeRegistry,
46    dialect: Arc<dyn SqlDialect>,
47    executor: QueryExecutor,
48    config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50    let mut builder = Schema::build("Query", None, None);
51
52    // Network enum
53    let mut network_enum = Enum::new("Network");
54    for net in &config.networks {
55        network_enum = network_enum.item(EnumItem::new(net));
56    }
57    builder = builder.register(network_enum);
58    builder = builder.register(filter_types::build_limit_input());
59
60    builder = builder.register(
61        InputObject::new("LimitByInput")
62            .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING)))
63            .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)))
64            .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))),
65    );
66
67    // Global OrderDirection enum for multi-column orderBy
68    builder = builder.register(
69        Enum::new("OrderDirection")
70            .item(EnumItem::new("ASC"))
71            .item(EnumItem::new("DESC")),
72    );
73
74    for input in filter_types::build_filter_primitives() {
75        builder = builder.register(input);
76    }
77
78    // Cubes are top-level Query fields, each with a required `network` argument.
79    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
80    let mut query = Object::new("Query");
81
82    for cube in registry.cubes() {
83        let types = build_cube_types(cube);
84        for obj in types.objects { builder = builder.register(obj); }
85        for inp in types.inputs { builder = builder.register(inp); }
86        for en in types.enums { builder = builder.register(en); }
87
88        let cube_name = cube.name.clone();
89        let dialect_clone = dialect.clone();
90        let executor_clone = executor.clone();
91        let stats_cb = config.stats_callback.clone();
92
93        let orderby_list_input_name = format!("{}OrderByInput", cube.name);
94
95        let mut field = Field::new(
96            &cube.name,
97            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
98            move |ctx| {
99                let cube_name = cube_name.clone();
100                let dialect = dialect_clone.clone();
101                let executor = executor_clone.clone();
102                let stats_cb = stats_cb.clone();
103                FieldFuture::new(async move {
104                    let registry = ctx.ctx.data::<CubeRegistry>()?;
105                    let network_val = ctx.args.try_get("network")?;
106                    let network = network_val.enum_name()
107                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
108
109                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
110                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
111                    })?;
112
113                    let metric_requests = extract_metric_requests(&ctx, cube_def);
114                    let requested = extract_requested_fields(&ctx, cube_def);
115                    let ir = compiler::parser::parse_cube_query(
116                        cube_def,
117                        network,
118                        &ctx.args,
119                        &metric_requests,
120                        Some(requested),
121                    )?;
122                    let validated = compiler::validator::validate(ir)?;
123                    let (sql, bindings) = dialect.compile(&validated);
124
125                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
126                        async_graphql::Error::new(format!("Query execution failed: {e}"))
127                    })?;
128
129                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
130                        .or_else(|| stats_cb.clone());
131                    if let Some(cb) = effective_cb {
132                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
133                        cb(stats);
134                    }
135
136                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
137                    Ok(Some(FieldValue::list(values)))
138                })
139            },
140        );
141        field = field
142            .argument(InputValue::new("network", TypeRef::named_nn("Network")))
143            .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
144            .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
145            .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput")))
146            .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
147            .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
148
149        for sel in &cube.selectors {
150            let filter_type = dim_type_to_filter_name(&sel.dim_type);
151            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
152        }
153
154        query = query.field(field);
155    }
156
157    // Feature 3: _cubeMetadata introspection field
158    let metadata_registry = Arc::new(registry.clone());
159    let metadata_field = Field::new(
160        "_cubeMetadata",
161        TypeRef::named_nn(TypeRef::STRING),
162        move |_ctx| {
163            let reg = metadata_registry.clone();
164            FieldFuture::new(async move {
165                let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
166                    serde_json::json!({
167                        "name": cube.name,
168                        "description": cube.description,
169                        "schema": cube.schema,
170                        "tablePattern": cube.table_pattern,
171                        "metrics": cube.metrics,
172                        "selectors": cube.selectors.iter().map(|s| {
173                            serde_json::json!({
174                                "name": s.graphql_name,
175                                "column": s.column,
176                                "type": format!("{:?}", s.dim_type),
177                            })
178                        }).collect::<Vec<_>>(),
179                        "dimensions": serialize_dims(&cube.dimensions),
180                        "defaultLimit": cube.default_limit,
181                        "maxLimit": cube.max_limit,
182                    })
183                }).collect();
184                let json = serde_json::to_string(&metadata).unwrap_or_default();
185                Ok(Some(FieldValue::value(Value::from(json))))
186            })
187        },
188    );
189    query = query.field(metadata_field);
190
191    builder = builder.register(query);
192    builder = builder.data(registry);
193
194    builder.finish()
195}
196
197fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
198    serde_json::Value::Array(dims.iter().map(|d| match d {
199        DimensionNode::Leaf(dim) => serde_json::json!({
200            "name": dim.graphql_name,
201            "column": dim.column,
202            "type": format!("{:?}", dim.dim_type),
203        }),
204        DimensionNode::Group { graphql_name, children } => serde_json::json!({
205            "name": graphql_name,
206            "children": serialize_dims(children),
207        }),
208    }).collect())
209}
210
211/// Extract metric requests from the GraphQL selection set by inspecting
212/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
213/// the "count" field in the selection set and extract its `of` argument.
214fn extract_metric_requests(
215    ctx: &async_graphql::dynamic::ResolverContext,
216    cube: &CubeDefinition,
217) -> Vec<compiler::parser::MetricRequest> {
218    let mut requests = Vec::new();
219
220    for sub_field in ctx.ctx.field().selection_set() {
221        let name = sub_field.name();
222        if !cube.metrics.contains(&name.to_string()) {
223            continue;
224        }
225
226        let args = match sub_field.arguments() {
227            Ok(args) => args,
228            Err(_) => continue,
229        };
230
231        let of_dimension = args
232            .iter()
233            .find(|(k, _)| k.as_str() == "of")
234            .and_then(|(_, v)| match v {
235                async_graphql::Value::Enum(e) => Some(e.to_string()),
236                async_graphql::Value::String(s) => Some(s.clone()),
237                _ => None,
238            })
239            .unwrap_or_else(|| "*".to_string());
240
241        let select_where_value = args
242            .iter()
243            .find(|(k, _)| k.as_str() == "selectWhere")
244            .map(|(_, v)| v.clone());
245
246        let condition_filter = args
247            .iter()
248            .find(|(k, _)| k.as_str() == "if")
249            .and_then(|(_, v)| {
250                compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
251                    .and_then(|f| if f.is_empty() { None } else { Some(f) })
252            });
253
254        requests.push(compiler::parser::MetricRequest {
255            function: name.to_string(),
256            of_dimension,
257            select_where_value,
258            condition_filter,
259        });
260    }
261
262    requests
263}
264
265fn extract_requested_fields(
266    ctx: &async_graphql::dynamic::ResolverContext,
267    cube: &CubeDefinition,
268) -> HashSet<String> {
269    let mut fields = HashSet::new();
270    collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
271    fields
272}
273
274fn collect_selection_paths(
275    field: &async_graphql::SelectionField<'_>,
276    prefix: &str,
277    out: &mut HashSet<String>,
278    metrics: &[String],
279) {
280    for sub in field.selection_set() {
281        let name = sub.name();
282        if metrics.iter().any(|m| m == name) {
283            continue;
284        }
285        let path = if prefix.is_empty() {
286            name.to_string()
287        } else {
288            format!("{prefix}_{name}")
289        };
290        let has_children = sub.selection_set().next().is_some();
291        if has_children {
292            collect_selection_paths(&sub, &path, out, metrics);
293        } else {
294            out.insert(path);
295        }
296    }
297}
298
299// ---------------------------------------------------------------------------
300// Per-Cube GraphQL type generation
301// ---------------------------------------------------------------------------
302
303struct CubeTypes {
304    objects: Vec<Object>,
305    inputs: Vec<InputObject>,
306    enums: Vec<Enum>,
307}
308
309fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
310    let record_name = format!("{}Record", cube.name);
311    let filter_name = format!("{}Filter", cube.name);
312    let orderby_name = format!("{}OrderBy", cube.name);
313
314    let mut record_fields: Vec<Field> = Vec::new();
315    let mut filter_fields: Vec<InputValue> = Vec::new();
316    let mut orderby_items: Vec<String> = Vec::new();
317    let mut extra_objects: Vec<Object> = Vec::new();
318    let mut extra_inputs: Vec<InputObject> = Vec::new();
319
320    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
321
322    {
323        let mut collector = DimCollector {
324            cube_name: &cube.name,
325            record_fields: &mut record_fields,
326            filter_fields: &mut filter_fields,
327            orderby_items: &mut orderby_items,
328            extra_objects: &mut extra_objects,
329            extra_inputs: &mut extra_inputs,
330        };
331        for node in &cube.dimensions {
332            collect_dimension_types(node, "", &mut collector);
333        }
334    }
335
336    let flat_dims = cube.flat_dimensions();
337    let mut metric_enums: Vec<Enum> = Vec::new();
338    for metric in &cube.metrics {
339        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
340        extra_inputs.push(
341            InputObject::new(&select_where_name)
342                .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
343                .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
344                .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
345                .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
346                .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
347        );
348
349        let of_enum_name = format!("{}_{}_Of", cube.name, metric);
350        let mut of_enum = Enum::new(&of_enum_name);
351        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
352        metric_enums.push(of_enum);
353
354        let metric_clone = metric.clone();
355        let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
356            let metric_key = metric_clone.clone();
357            FieldFuture::new(async move {
358                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
359                let key = format!("__{metric_key}");
360                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
361                Ok(Some(FieldValue::value(json_to_gql_value(val))))
362            })
363        })
364        .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
365        .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
366        .argument(InputValue::new("if", TypeRef::named(&filter_name)));
367
368        record_fields.push(metric_field);
369    }
370
371    let mut record = Object::new(&record_name);
372    for f in record_fields { record = record.field(f); }
373
374    let mut filter = InputObject::new(&filter_name);
375    for f in filter_fields { filter = filter.field(f); }
376
377    let mut orderby = Enum::new(&orderby_name);
378    for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
379
380    // Multi-column orderBy: {Cube}OrderBy_Field enum + {Cube}OrderByInput
381    let field_enum_name = format!("{}_Field", orderby_name);
382    let orderby_input_name = format!("{}OrderByInput", cube.name);
383    let mut field_enum = Enum::new(&field_enum_name);
384    let flat_dims = cube.flat_dimensions();
385    for (path, _) in &flat_dims {
386        field_enum = field_enum.item(EnumItem::new(path));
387    }
388    let orderby_input = InputObject::new(&orderby_input_name)
389        .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
390        .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
391
392    let mut objects = vec![record]; objects.extend(extra_objects);
393    let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
394    let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
395
396    CubeTypes { objects, inputs, enums }
397}
398
399struct DimCollector<'a> {
400    cube_name: &'a str,
401    record_fields: &'a mut Vec<Field>,
402    filter_fields: &'a mut Vec<InputValue>,
403    orderby_items: &'a mut Vec<String>,
404    extra_objects: &'a mut Vec<Object>,
405    extra_inputs: &'a mut Vec<InputObject>,
406}
407
408fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
409    match node {
410        DimensionNode::Leaf(dim) => {
411            let col = dim.column.clone();
412            let is_datetime = dim.dim_type == DimType::DateTime;
413            let leaf_field = Field::new(
414                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
415                move |ctx| {
416                    let col = col.clone();
417                    FieldFuture::new(async move {
418                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
419                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
420                        let gql_val = if is_datetime {
421                            json_to_gql_datetime(val)
422                        } else {
423                            json_to_gql_value(val)
424                        };
425                        Ok(Some(FieldValue::value(gql_val)))
426                    })
427                },
428            );
429            c.record_fields.push(leaf_field);
430            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
431
432            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
433            c.orderby_items.push(format!("{path}_ASC"));
434            c.orderby_items.push(format!("{path}_DESC"));
435        }
436        DimensionNode::Group { graphql_name, children } => {
437            let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
438            let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
439            let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
440
441            let mut child_record_fields: Vec<Field> = Vec::new();
442            let mut child_filter_fields: Vec<InputValue> = Vec::new();
443            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
444
445            let mut child_collector = DimCollector {
446                cube_name: c.cube_name,
447                record_fields: &mut child_record_fields,
448                filter_fields: &mut child_filter_fields,
449                orderby_items: c.orderby_items,
450                extra_objects: c.extra_objects,
451                extra_inputs: c.extra_inputs,
452            };
453            for child in children {
454                collect_dimension_types(child, &new_prefix, &mut child_collector);
455            }
456
457            let mut nested_record = Object::new(&nested_record_name);
458            for f in child_record_fields { nested_record = nested_record.field(f); }
459
460            let mut nested_filter = InputObject::new(&nested_filter_name);
461            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
462
463            let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
464                FieldFuture::new(async move {
465                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
466                    Ok(Some(FieldValue::owned_any(row.clone())))
467                })
468            });
469            c.record_fields.push(group_field);
470            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
471            c.extra_objects.push(nested_record);
472            c.extra_inputs.push(nested_filter);
473        }
474    }
475}
476
477fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
478    match dt {
479        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
480        DimType::Int => TypeRef::named(TypeRef::INT),
481        DimType::Float => TypeRef::named(TypeRef::FLOAT),
482        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
483    }
484}
485
486fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
487    match dt {
488        DimType::String => "StringFilter",
489        DimType::Int => "IntFilter",
490        DimType::Float => "FloatFilter",
491        DimType::DateTime => "DateTimeFilter",
492        DimType::Bool => "BoolFilter",
493    }
494}
495
496pub fn json_to_gql_value(v: serde_json::Value) -> Value {
497    match v {
498        serde_json::Value::Null => Value::Null,
499        serde_json::Value::Bool(b) => Value::from(b),
500        serde_json::Value::Number(n) => {
501            if let Some(i) = n.as_i64() { Value::from(i) }
502            else if let Some(f) = n.as_f64() { Value::from(f) }
503            else { Value::from(n.to_string()) }
504        }
505        serde_json::Value::String(s) => Value::from(s),
506        _ => Value::from(v.to_string()),
507    }
508}
509
510/// Convert a ClickHouse DateTime value to ISO 8601 format.
511/// `"2026-03-27 19:06:41.000"` -> `"2026-03-27T19:06:41.000Z"`
512fn json_to_gql_datetime(v: serde_json::Value) -> Value {
513    match v {
514        serde_json::Value::String(s) => {
515            let iso = if s.contains('T') {
516                if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
517            } else {
518                let replaced = s.replacen(' ', "T", 1);
519                if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
520            };
521            Value::from(iso)
522        }
523        other => json_to_gql_value(other),
524    }
525}