Skip to main content

activecube_rs/schema/
generator.rs

1use std::sync::Arc;
2use async_graphql::dynamic::*;
3use async_graphql::Value;
4
5use crate::compiler;
6use crate::compiler::ir::SqlValue;
7use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
8use crate::cube::registry::CubeRegistry;
9use crate::response::RowMap;
10use crate::schema::filter_types;
11use crate::sql::dialect::SqlDialect;
12use crate::stats::{QueryStats, StatsCallback};
13
14/// Async function type that executes a compiled SQL query and returns rows.
15/// The service layer provides this — the library never touches a database directly.
16pub type QueryExecutor = Arc<
17    dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
18        Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
19    > + Send + Sync,
20>;
21
22/// Configuration for supported networks (chains) and optional stats collection.
23pub struct SchemaConfig {
24    pub networks: Vec<String>,
25    pub root_query_name: String,
26    /// Optional callback invoked after each cube query with execution metadata.
27    /// Used by application layer for billing, observability, etc.
28    pub stats_callback: Option<StatsCallback>,
29}
30
31impl Default for SchemaConfig {
32    fn default() -> Self {
33        Self {
34            networks: vec!["sol", "eth", "bsc"]
35                .into_iter().map(String::from).collect(),
36            root_query_name: "ChainStream".to_string(),
37            stats_callback: None,
38        }
39    }
40}
41
42/// Build a complete async-graphql dynamic schema from registry + dialect + executor.
43pub fn build_schema(
44    registry: CubeRegistry,
45    dialect: Arc<dyn SqlDialect>,
46    executor: QueryExecutor,
47    config: SchemaConfig,
48) -> Result<Schema, SchemaError> {
49    let mut builder = Schema::build("Query", None, None);
50
51    // Network enum
52    let mut network_enum = Enum::new("Network");
53    for net in &config.networks {
54        network_enum = network_enum.item(EnumItem::new(net));
55    }
56    builder = builder.register(network_enum);
57    builder = builder.register(filter_types::build_limit_input());
58
59    for input in filter_types::build_filter_primitives() {
60        builder = builder.register(input);
61    }
62
63    // Cubes are top-level Query fields, each with a required `network` argument.
64    // Query pattern: `query { DEXTrades(network: sol, limit: ...) { ... } }`
65    let mut query = Object::new("Query");
66
67    for cube in registry.cubes() {
68        let types = build_cube_types(cube);
69        for obj in types.objects { builder = builder.register(obj); }
70        for inp in types.inputs { builder = builder.register(inp); }
71        for en in types.enums { builder = builder.register(en); }
72
73        let cube_name = cube.name.clone();
74        let dialect_clone = dialect.clone();
75        let executor_clone = executor.clone();
76        let stats_cb = config.stats_callback.clone();
77
78        let mut field = Field::new(
79            &cube.name,
80            TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
81            move |ctx| {
82                let cube_name = cube_name.clone();
83                let dialect = dialect_clone.clone();
84                let executor = executor_clone.clone();
85                let stats_cb = stats_cb.clone();
86                FieldFuture::new(async move {
87                    let registry = ctx.ctx.data::<CubeRegistry>()?;
88                    let network_val = ctx.args.try_get("network")?;
89                    let network = network_val.enum_name()
90                        .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
91
92                    let cube_def = registry.get(&cube_name).ok_or_else(|| {
93                        async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
94                    })?;
95
96                    let metric_requests = extract_metric_requests(&ctx, cube_def);
97                    let ir = compiler::parser::parse_cube_query(cube_def, network, &ctx.args, &metric_requests)?;
98                    let validated = compiler::validator::validate(ir)?;
99                    let (sql, bindings) = dialect.compile(&validated);
100
101                    let rows = executor(sql.clone(), bindings).await.map_err(|e| {
102                        async_graphql::Error::new(format!("Query execution failed: {e}"))
103                    })?;
104
105                    // Per-request callback (injected via request.data()) takes precedence
106                    // over the global SchemaConfig callback.
107                    let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
108                        .or_else(|| stats_cb.clone());
109                    if let Some(cb) = effective_cb {
110                        let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
111                        cb(stats);
112                    }
113
114                    let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
115                    Ok(Some(FieldValue::list(values)))
116                })
117            },
118        )
119        .argument(InputValue::new("network", TypeRef::named_nn("Network")))
120        .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
121        .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
122        .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))));
123
124        for sel in &cube.selectors {
125            let filter_type = dim_type_to_filter_name(&sel.dim_type);
126            field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
127        }
128
129        query = query.field(field);
130    }
131
132    builder = builder.register(query);
133    builder = builder.data(registry);
134
135    builder.finish()
136}
137
138/// Extract metric requests from the GraphQL selection set by inspecting
139/// child fields. If a user selects `count(of: "Trade_Buy_Amount")`, we find
140/// the "count" field in the selection set and extract its `of` argument.
141fn extract_metric_requests(
142    ctx: &async_graphql::dynamic::ResolverContext,
143    cube: &CubeDefinition,
144) -> Vec<compiler::parser::MetricRequest> {
145    let mut requests = Vec::new();
146
147    for sub_field in ctx.ctx.field().selection_set() {
148        let name = sub_field.name();
149        if !cube.metrics.contains(&name.to_string()) {
150            continue;
151        }
152
153        let args = match sub_field.arguments() {
154            Ok(args) => args,
155            Err(_) => continue,
156        };
157
158        let of_dimension = args
159            .iter()
160            .find(|(k, _)| k.as_str() == "of")
161            .and_then(|(_, v)| match v {
162                async_graphql::Value::Enum(e) => Some(e.to_string()),
163                async_graphql::Value::String(s) => Some(s.clone()),
164                _ => None,
165            })
166            .unwrap_or_else(|| "*".to_string());
167
168        let select_where_value = args
169            .iter()
170            .find(|(k, _)| k.as_str() == "selectWhere")
171            .map(|(_, v)| v.clone());
172
173        requests.push(compiler::parser::MetricRequest {
174            function: name.to_string(),
175            of_dimension,
176            select_where_value,
177        });
178    }
179
180    requests
181}
182
183// ---------------------------------------------------------------------------
184// Per-Cube GraphQL type generation
185// ---------------------------------------------------------------------------
186
187struct CubeTypes {
188    objects: Vec<Object>,
189    inputs: Vec<InputObject>,
190    enums: Vec<Enum>,
191}
192
193fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
194    let record_name = format!("{}Record", cube.name);
195    let filter_name = format!("{}Filter", cube.name);
196    let orderby_name = format!("{}OrderBy", cube.name);
197
198    let mut record_fields: Vec<Field> = Vec::new();
199    let mut filter_fields: Vec<InputValue> = Vec::new();
200    let mut orderby_items: Vec<String> = Vec::new();
201    let mut extra_objects: Vec<Object> = Vec::new();
202    let mut extra_inputs: Vec<InputObject> = Vec::new();
203
204    filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
205
206    {
207        let mut collector = DimCollector {
208            cube_name: &cube.name,
209            record_fields: &mut record_fields,
210            filter_fields: &mut filter_fields,
211            orderby_items: &mut orderby_items,
212            extra_objects: &mut extra_objects,
213            extra_inputs: &mut extra_inputs,
214        };
215        for node in &cube.dimensions {
216            collect_dimension_types(node, "", &mut collector);
217        }
218    }
219
220    let flat_dims = cube.flat_dimensions();
221    let mut metric_enums: Vec<Enum> = Vec::new();
222    for metric in &cube.metrics {
223        let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
224        extra_inputs.push(
225            InputObject::new(&select_where_name)
226                .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
227                .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
228                .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
229                .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
230                .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
231        );
232
233        let of_enum_name = format!("{}_{}_Of", cube.name, metric);
234        let mut of_enum = Enum::new(&of_enum_name);
235        for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
236        metric_enums.push(of_enum);
237
238        let metric_clone = metric.clone();
239        let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
240            let metric_key = metric_clone.clone();
241            FieldFuture::new(async move {
242                let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
243                let key = format!("__{metric_key}");
244                let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
245                Ok(Some(FieldValue::value(json_to_gql_value(val))))
246            })
247        })
248        .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
249        .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)));
250
251        record_fields.push(metric_field);
252    }
253
254    let mut record = Object::new(&record_name);
255    for f in record_fields { record = record.field(f); }
256
257    let mut filter = InputObject::new(&filter_name);
258    for f in filter_fields { filter = filter.field(f); }
259
260    let mut orderby = Enum::new(&orderby_name);
261    for item in orderby_items { orderby = orderby.item(EnumItem::new(item)); }
262
263    let mut objects = vec![record]; objects.extend(extra_objects);
264    let mut inputs = vec![filter]; inputs.extend(extra_inputs);
265    let mut enums = vec![orderby]; enums.extend(metric_enums);
266
267    CubeTypes { objects, inputs, enums }
268}
269
270struct DimCollector<'a> {
271    cube_name: &'a str,
272    record_fields: &'a mut Vec<Field>,
273    filter_fields: &'a mut Vec<InputValue>,
274    orderby_items: &'a mut Vec<String>,
275    extra_objects: &'a mut Vec<Object>,
276    extra_inputs: &'a mut Vec<InputObject>,
277}
278
279fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
280    match node {
281        DimensionNode::Leaf(dim) => {
282            let col = dim.column.clone();
283            let leaf_field = Field::new(
284                &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
285                move |ctx| {
286                    let col = col.clone();
287                    FieldFuture::new(async move {
288                        let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
289                        let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
290                        Ok(Some(FieldValue::value(json_to_gql_value(val))))
291                    })
292                },
293            );
294            c.record_fields.push(leaf_field);
295            c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
296
297            let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
298            c.orderby_items.push(format!("{path}_ASC"));
299            c.orderby_items.push(format!("{path}_DESC"));
300        }
301        DimensionNode::Group { graphql_name, children } => {
302            let nested_record_name = format!("{}_{graphql_name}_Record", c.cube_name);
303            let nested_filter_name = format!("{}_{graphql_name}_Filter", c.cube_name);
304
305            let mut child_record_fields: Vec<Field> = Vec::new();
306            let mut child_filter_fields: Vec<InputValue> = Vec::new();
307            let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
308
309            let mut child_collector = DimCollector {
310                cube_name: c.cube_name,
311                record_fields: &mut child_record_fields,
312                filter_fields: &mut child_filter_fields,
313                orderby_items: c.orderby_items,
314                extra_objects: c.extra_objects,
315                extra_inputs: c.extra_inputs,
316            };
317            for child in children {
318                collect_dimension_types(child, &new_prefix, &mut child_collector);
319            }
320
321            let mut nested_record = Object::new(&nested_record_name);
322            for f in child_record_fields { nested_record = nested_record.field(f); }
323
324            let mut nested_filter = InputObject::new(&nested_filter_name);
325            for f in child_filter_fields { nested_filter = nested_filter.field(f); }
326
327            let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
328                FieldFuture::new(async move {
329                    let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
330                    Ok(Some(FieldValue::owned_any(row.clone())))
331                })
332            });
333            c.record_fields.push(group_field);
334            c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
335            c.extra_objects.push(nested_record);
336            c.extra_inputs.push(nested_filter);
337        }
338    }
339}
340
341fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
342    match dt {
343        DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
344        DimType::Int => TypeRef::named(TypeRef::INT),
345        DimType::Float => TypeRef::named(TypeRef::FLOAT),
346        DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
347    }
348}
349
350fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
351    match dt {
352        DimType::String => "StringFilter",
353        DimType::Int => "IntFilter",
354        DimType::Float => "FloatFilter",
355        DimType::DateTime => "DateTimeFilter",
356        DimType::Bool => "BoolFilter",
357    }
358}
359
360pub fn json_to_gql_value(v: serde_json::Value) -> Value {
361    match v {
362        serde_json::Value::Null => Value::Null,
363        serde_json::Value::Bool(b) => Value::from(b),
364        serde_json::Value::Number(n) => {
365            if let Some(i) = n.as_i64() { Value::from(i) }
366            else if let Some(f) = n.as_f64() { Value::from(f) }
367            else { Value::from(n.to_string()) }
368        }
369        serde_json::Value::String(s) => Value::from(s),
370        _ => Value::from(v.to_string()),
371    }
372}