1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub type QueryExecutor = Arc<
18 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20 > + Send + Sync,
21>;
22
23pub struct SchemaConfig {
25 pub networks: Vec<String>,
26 pub root_query_name: String,
27 pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33 fn default() -> Self {
34 Self {
35 networks: vec!["sol", "eth", "bsc"]
36 .into_iter().map(String::from).collect(),
37 root_query_name: "ChainStream".to_string(),
38 stats_callback: None,
39 }
40 }
41}
42
43pub fn build_schema(
45 registry: CubeRegistry,
46 dialect: Arc<dyn SqlDialect>,
47 executor: QueryExecutor,
48 config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50 let mut builder = Schema::build("Query", None, None);
51
52 let mut network_enum = Enum::new("Network");
54 for net in &config.networks {
55 network_enum = network_enum.item(EnumItem::new(net));
56 }
57 builder = builder.register(network_enum);
58 builder = builder.register(filter_types::build_limit_input());
59
60 builder = builder.register(
62 Enum::new("OrderDirection")
63 .item(EnumItem::new("ASC"))
64 .item(EnumItem::new("DESC")),
65 );
66
67 for input in filter_types::build_filter_primitives() {
68 builder = builder.register(input);
69 }
70
71 let mut query = Object::new("Query");
74
75 for cube in registry.cubes() {
76 let types = build_cube_types(cube);
77 for obj in types.objects { builder = builder.register(obj); }
78 for inp in types.inputs { builder = builder.register(inp); }
79 for en in types.enums { builder = builder.register(en); }
80
81 let cube_name = cube.name.clone();
82 let dialect_clone = dialect.clone();
83 let executor_clone = executor.clone();
84 let stats_cb = config.stats_callback.clone();
85
86 let orderby_list_input_name = format!("{}OrderByInput", cube.name);
87
88 let mut field = Field::new(
89 &cube.name,
90 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
91 move |ctx| {
92 let cube_name = cube_name.clone();
93 let dialect = dialect_clone.clone();
94 let executor = executor_clone.clone();
95 let stats_cb = stats_cb.clone();
96 FieldFuture::new(async move {
97 let registry = ctx.ctx.data::<CubeRegistry>()?;
98 let network_val = ctx.args.try_get("network")?;
99 let network = network_val.enum_name()
100 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
101
102 let cube_def = registry.get(&cube_name).ok_or_else(|| {
103 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
104 })?;
105
106 let metric_requests = extract_metric_requests(&ctx, cube_def);
107 let requested = extract_requested_fields(&ctx, cube_def);
108 let ir = compiler::parser::parse_cube_query(
109 cube_def,
110 network,
111 &ctx.args,
112 &metric_requests,
113 Some(requested),
114 )?;
115 let validated = compiler::validator::validate(ir)?;
116 let (sql, bindings) = dialect.compile(&validated);
117
118 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
119 async_graphql::Error::new(format!("Query execution failed: {e}"))
120 })?;
121
122 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
123 .or_else(|| stats_cb.clone());
124 if let Some(cb) = effective_cb {
125 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
126 cb(stats);
127 }
128
129 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
130 Ok(Some(FieldValue::list(values)))
131 })
132 },
133 )
134 .argument(InputValue::new("network", TypeRef::named_nn("Network")))
135 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
136 .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
137 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
138 .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
139
140 for sel in &cube.selectors {
141 let filter_type = dim_type_to_filter_name(&sel.dim_type);
142 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
143 }
144
145 query = query.field(field);
146 }
147
148 let metadata_registry = Arc::new(registry.clone());
150 let metadata_field = Field::new(
151 "_cubeMetadata",
152 TypeRef::named_nn(TypeRef::STRING),
153 move |_ctx| {
154 let reg = metadata_registry.clone();
155 FieldFuture::new(async move {
156 let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
157 serde_json::json!({
158 "name": cube.name,
159 "schema": cube.schema,
160 "tablePattern": cube.table_pattern,
161 "metrics": cube.metrics,
162 "selectors": cube.selectors.iter().map(|s| {
163 serde_json::json!({
164 "name": s.graphql_name,
165 "column": s.column,
166 "type": format!("{:?}", s.dim_type),
167 })
168 }).collect::<Vec<_>>(),
169 "dimensions": serialize_dims(&cube.dimensions),
170 "defaultLimit": cube.default_limit,
171 "maxLimit": cube.max_limit,
172 })
173 }).collect();
174 let json = serde_json::to_string(&metadata).unwrap_or_default();
175 Ok(Some(FieldValue::value(Value::from(json))))
176 })
177 },
178 );
179 query = query.field(metadata_field);
180
181 builder = builder.register(query);
182 builder = builder.data(registry);
183
184 builder.finish()
185}
186
187fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
188 serde_json::Value::Array(dims.iter().map(|d| match d {
189 DimensionNode::Leaf(dim) => serde_json::json!({
190 "name": dim.graphql_name,
191 "column": dim.column,
192 "type": format!("{:?}", dim.dim_type),
193 }),
194 DimensionNode::Group { graphql_name, children } => serde_json::json!({
195 "name": graphql_name,
196 "children": serialize_dims(children),
197 }),
198 }).collect())
199}
200
201fn extract_metric_requests(
205 ctx: &async_graphql::dynamic::ResolverContext,
206 cube: &CubeDefinition,
207) -> Vec<compiler::parser::MetricRequest> {
208 let mut requests = Vec::new();
209
210 for sub_field in ctx.ctx.field().selection_set() {
211 let name = sub_field.name();
212 if !cube.metrics.contains(&name.to_string()) {
213 continue;
214 }
215
216 let args = match sub_field.arguments() {
217 Ok(args) => args,
218 Err(_) => continue,
219 };
220
221 let of_dimension = args
222 .iter()
223 .find(|(k, _)| k.as_str() == "of")
224 .and_then(|(_, v)| match v {
225 async_graphql::Value::Enum(e) => Some(e.to_string()),
226 async_graphql::Value::String(s) => Some(s.clone()),
227 _ => None,
228 })
229 .unwrap_or_else(|| "*".to_string());
230
231 let select_where_value = args
232 .iter()
233 .find(|(k, _)| k.as_str() == "selectWhere")
234 .map(|(_, v)| v.clone());
235
236 let condition_filter = args
237 .iter()
238 .find(|(k, _)| k.as_str() == "if")
239 .and_then(|(_, v)| {
240 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
241 .and_then(|f| if f.is_empty() { None } else { Some(f) })
242 });
243
244 requests.push(compiler::parser::MetricRequest {
245 function: name.to_string(),
246 of_dimension,
247 select_where_value,
248 condition_filter,
249 });
250 }
251
252 requests
253}
254
255fn extract_requested_fields(
256 ctx: &async_graphql::dynamic::ResolverContext,
257 cube: &CubeDefinition,
258) -> HashSet<String> {
259 let mut fields = HashSet::new();
260 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
261 fields
262}
263
264fn collect_selection_paths(
265 field: &async_graphql::SelectionField<'_>,
266 prefix: &str,
267 out: &mut HashSet<String>,
268 metrics: &[String],
269) {
270 for sub in field.selection_set() {
271 let name = sub.name();
272 if metrics.iter().any(|m| m == name) {
273 continue;
274 }
275 let path = if prefix.is_empty() {
276 name.to_string()
277 } else {
278 format!("{prefix}_{name}")
279 };
280 let has_children = sub.selection_set().next().is_some();
281 if has_children {
282 collect_selection_paths(&sub, &path, out, metrics);
283 } else {
284 out.insert(path);
285 }
286 }
287}
288
289struct CubeTypes {
294 objects: Vec<Object>,
295 inputs: Vec<InputObject>,
296 enums: Vec<Enum>,
297}
298
299fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
300 let record_name = format!("{}Record", cube.name);
301 let filter_name = format!("{}Filter", cube.name);
302 let orderby_name = format!("{}OrderBy", cube.name);
303
304 let mut record_fields: Vec<Field> = Vec::new();
305 let mut filter_fields: Vec<InputValue> = Vec::new();
306 let mut orderby_items: Vec<String> = Vec::new();
307 let mut extra_objects: Vec<Object> = Vec::new();
308 let mut extra_inputs: Vec<InputObject> = Vec::new();
309
310 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
311
312 {
313 let mut collector = DimCollector {
314 cube_name: &cube.name,
315 record_fields: &mut record_fields,
316 filter_fields: &mut filter_fields,
317 orderby_items: &mut orderby_items,
318 extra_objects: &mut extra_objects,
319 extra_inputs: &mut extra_inputs,
320 };
321 for node in &cube.dimensions {
322 collect_dimension_types(node, "", &mut collector);
323 }
324 }
325
326 let flat_dims = cube.flat_dimensions();
327 let mut metric_enums: Vec<Enum> = Vec::new();
328 for metric in &cube.metrics {
329 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
330 extra_inputs.push(
331 InputObject::new(&select_where_name)
332 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
333 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
334 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
335 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
336 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
337 );
338
339 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
340 let mut of_enum = Enum::new(&of_enum_name);
341 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
342 metric_enums.push(of_enum);
343
344 let metric_clone = metric.clone();
345 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
346 let metric_key = metric_clone.clone();
347 FieldFuture::new(async move {
348 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
349 let key = format!("__{metric_key}");
350 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
351 Ok(Some(FieldValue::value(json_to_gql_value(val))))
352 })
353 })
354 .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
355 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
356 .argument(InputValue::new("if", TypeRef::named(&filter_name)));
357
358 record_fields.push(metric_field);
359 }
360
361 let mut record = Object::new(&record_name);
362 for f in record_fields { record = record.field(f); }
363
364 let mut filter = InputObject::new(&filter_name);
365 for f in filter_fields { filter = filter.field(f); }
366
367 let mut orderby = Enum::new(&orderby_name);
368 for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
369
370 let field_enum_name = format!("{}_Field", orderby_name);
372 let orderby_input_name = format!("{}OrderByInput", cube.name);
373 let mut field_enum = Enum::new(&field_enum_name);
374 let flat_dims = cube.flat_dimensions();
375 for (path, _) in &flat_dims {
376 field_enum = field_enum.item(EnumItem::new(path));
377 }
378 let orderby_input = InputObject::new(&orderby_input_name)
379 .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
380 .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
381
382 let mut objects = vec![record]; objects.extend(extra_objects);
383 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
384 let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
385
386 CubeTypes { objects, inputs, enums }
387}
388
389struct DimCollector<'a> {
390 cube_name: &'a str,
391 record_fields: &'a mut Vec<Field>,
392 filter_fields: &'a mut Vec<InputValue>,
393 orderby_items: &'a mut Vec<String>,
394 extra_objects: &'a mut Vec<Object>,
395 extra_inputs: &'a mut Vec<InputObject>,
396}
397
398fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
399 match node {
400 DimensionNode::Leaf(dim) => {
401 let col = dim.column.clone();
402 let is_datetime = dim.dim_type == DimType::DateTime;
403 let leaf_field = Field::new(
404 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
405 move |ctx| {
406 let col = col.clone();
407 FieldFuture::new(async move {
408 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
409 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
410 let gql_val = if is_datetime {
411 json_to_gql_datetime(val)
412 } else {
413 json_to_gql_value(val)
414 };
415 Ok(Some(FieldValue::value(gql_val)))
416 })
417 },
418 );
419 c.record_fields.push(leaf_field);
420 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
421
422 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
423 c.orderby_items.push(format!("{path}_ASC"));
424 c.orderby_items.push(format!("{path}_DESC"));
425 }
426 DimensionNode::Group { graphql_name, children } => {
427 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
428 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
429 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
430
431 let mut child_record_fields: Vec<Field> = Vec::new();
432 let mut child_filter_fields: Vec<InputValue> = Vec::new();
433 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
434
435 let mut child_collector = DimCollector {
436 cube_name: c.cube_name,
437 record_fields: &mut child_record_fields,
438 filter_fields: &mut child_filter_fields,
439 orderby_items: c.orderby_items,
440 extra_objects: c.extra_objects,
441 extra_inputs: c.extra_inputs,
442 };
443 for child in children {
444 collect_dimension_types(child, &new_prefix, &mut child_collector);
445 }
446
447 let mut nested_record = Object::new(&nested_record_name);
448 for f in child_record_fields { nested_record = nested_record.field(f); }
449
450 let mut nested_filter = InputObject::new(&nested_filter_name);
451 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
452
453 let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
454 FieldFuture::new(async move {
455 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
456 Ok(Some(FieldValue::owned_any(row.clone())))
457 })
458 });
459 c.record_fields.push(group_field);
460 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
461 c.extra_objects.push(nested_record);
462 c.extra_inputs.push(nested_filter);
463 }
464 }
465}
466
467fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
468 match dt {
469 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
470 DimType::Int => TypeRef::named(TypeRef::INT),
471 DimType::Float => TypeRef::named(TypeRef::FLOAT),
472 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
473 }
474}
475
476fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
477 match dt {
478 DimType::String => "StringFilter",
479 DimType::Int => "IntFilter",
480 DimType::Float => "FloatFilter",
481 DimType::DateTime => "DateTimeFilter",
482 DimType::Bool => "BoolFilter",
483 }
484}
485
486pub fn json_to_gql_value(v: serde_json::Value) -> Value {
487 match v {
488 serde_json::Value::Null => Value::Null,
489 serde_json::Value::Bool(b) => Value::from(b),
490 serde_json::Value::Number(n) => {
491 if let Some(i) = n.as_i64() { Value::from(i) }
492 else if let Some(f) = n.as_f64() { Value::from(f) }
493 else { Value::from(n.to_string()) }
494 }
495 serde_json::Value::String(s) => Value::from(s),
496 _ => Value::from(v.to_string()),
497 }
498}
499
500fn json_to_gql_datetime(v: serde_json::Value) -> Value {
503 match v {
504 serde_json::Value::String(s) => {
505 let iso = if s.contains('T') {
506 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
507 } else {
508 let replaced = s.replacen(' ', "T", 1);
509 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
510 };
511 Value::from(iso)
512 }
513 other => json_to_gql_value(other),
514 }
515}