1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub type QueryExecutor = Arc<
18 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20 > + Send + Sync,
21>;
22
23pub struct SchemaConfig {
25 pub networks: Vec<String>,
26 pub root_query_name: String,
27 pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33 fn default() -> Self {
34 Self {
35 networks: vec!["sol", "eth", "bsc"]
36 .into_iter().map(String::from).collect(),
37 root_query_name: "ChainStream".to_string(),
38 stats_callback: None,
39 }
40 }
41}
42
43pub fn build_schema(
45 registry: CubeRegistry,
46 dialect: Arc<dyn SqlDialect>,
47 executor: QueryExecutor,
48 config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50 let mut builder = Schema::build("Query", None, None);
51
52 let mut network_enum = Enum::new("Network");
54 for net in &config.networks {
55 network_enum = network_enum.item(EnumItem::new(net));
56 }
57 builder = builder.register(network_enum);
58 builder = builder.register(filter_types::build_limit_input());
59
60 builder = builder.register(
61 InputObject::new("LimitByInput")
62 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING)))
63 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)))
64 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))),
65 );
66
67 builder = builder.register(
69 Enum::new("OrderDirection")
70 .item(EnumItem::new("ASC"))
71 .item(EnumItem::new("DESC")),
72 );
73
74 for input in filter_types::build_filter_primitives() {
75 builder = builder.register(input);
76 }
77
78 let mut query = Object::new("Query");
81
82 for cube in registry.cubes() {
83 let types = build_cube_types(cube);
84 for obj in types.objects { builder = builder.register(obj); }
85 for inp in types.inputs { builder = builder.register(inp); }
86 for en in types.enums { builder = builder.register(en); }
87
88 let cube_name = cube.name.clone();
89 let dialect_clone = dialect.clone();
90 let executor_clone = executor.clone();
91 let stats_cb = config.stats_callback.clone();
92
93 let orderby_list_input_name = format!("{}OrderByInput", cube.name);
94
95 let mut field = Field::new(
96 &cube.name,
97 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
98 move |ctx| {
99 let cube_name = cube_name.clone();
100 let dialect = dialect_clone.clone();
101 let executor = executor_clone.clone();
102 let stats_cb = stats_cb.clone();
103 FieldFuture::new(async move {
104 let registry = ctx.ctx.data::<CubeRegistry>()?;
105 let network_val = ctx.args.try_get("network")?;
106 let network = network_val.enum_name()
107 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
108
109 let cube_def = registry.get(&cube_name).ok_or_else(|| {
110 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
111 })?;
112
113 let metric_requests = extract_metric_requests(&ctx, cube_def);
114 let requested = extract_requested_fields(&ctx, cube_def);
115 let ir = compiler::parser::parse_cube_query(
116 cube_def,
117 network,
118 &ctx.args,
119 &metric_requests,
120 Some(requested),
121 )?;
122 let validated = compiler::validator::validate(ir)?;
123 let (sql, bindings) = dialect.compile(&validated);
124
125 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
126 async_graphql::Error::new(format!("Query execution failed: {e}"))
127 })?;
128
129 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
130 .or_else(|| stats_cb.clone());
131 if let Some(cb) = effective_cb {
132 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
133 cb(stats);
134 }
135
136 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
137 Ok(Some(FieldValue::list(values)))
138 })
139 },
140 );
141 field = field
142 .argument(InputValue::new("network", TypeRef::named_nn("Network")))
143 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
144 .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
145 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput")))
146 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
147 .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
148
149 for sel in &cube.selectors {
150 let filter_type = dim_type_to_filter_name(&sel.dim_type);
151 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
152 }
153
154 query = query.field(field);
155 }
156
157 let metadata_registry = Arc::new(registry.clone());
159 let metadata_field = Field::new(
160 "_cubeMetadata",
161 TypeRef::named_nn(TypeRef::STRING),
162 move |_ctx| {
163 let reg = metadata_registry.clone();
164 FieldFuture::new(async move {
165 let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
166 serde_json::json!({
167 "name": cube.name,
168 "description": cube.description,
169 "schema": cube.schema,
170 "tablePattern": cube.table_pattern,
171 "metrics": cube.metrics,
172 "selectors": cube.selectors.iter().map(|s| {
173 serde_json::json!({
174 "name": s.graphql_name,
175 "column": s.column,
176 "type": format!("{:?}", s.dim_type),
177 })
178 }).collect::<Vec<_>>(),
179 "dimensions": serialize_dims(&cube.dimensions),
180 "defaultLimit": cube.default_limit,
181 "maxLimit": cube.max_limit,
182 })
183 }).collect();
184 let json = serde_json::to_string(&metadata).unwrap_or_default();
185 Ok(Some(FieldValue::value(Value::from(json))))
186 })
187 },
188 );
189 query = query.field(metadata_field);
190
191 builder = builder.register(query);
192 builder = builder.data(registry);
193
194 builder.finish()
195}
196
197fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
198 serde_json::Value::Array(dims.iter().map(|d| match d {
199 DimensionNode::Leaf(dim) => serde_json::json!({
200 "name": dim.graphql_name,
201 "column": dim.column,
202 "type": format!("{:?}", dim.dim_type),
203 }),
204 DimensionNode::Group { graphql_name, children } => serde_json::json!({
205 "name": graphql_name,
206 "children": serialize_dims(children),
207 }),
208 }).collect())
209}
210
211fn extract_metric_requests(
215 ctx: &async_graphql::dynamic::ResolverContext,
216 cube: &CubeDefinition,
217) -> Vec<compiler::parser::MetricRequest> {
218 let mut requests = Vec::new();
219
220 for sub_field in ctx.ctx.field().selection_set() {
221 let name = sub_field.name();
222 if !cube.metrics.contains(&name.to_string()) {
223 continue;
224 }
225
226 let args = match sub_field.arguments() {
227 Ok(args) => args,
228 Err(_) => continue,
229 };
230
231 let of_dimension = args
232 .iter()
233 .find(|(k, _)| k.as_str() == "of")
234 .and_then(|(_, v)| match v {
235 async_graphql::Value::Enum(e) => Some(e.to_string()),
236 async_graphql::Value::String(s) => Some(s.clone()),
237 _ => None,
238 })
239 .unwrap_or_else(|| "*".to_string());
240
241 let select_where_value = args
242 .iter()
243 .find(|(k, _)| k.as_str() == "selectWhere")
244 .map(|(_, v)| v.clone());
245
246 let condition_filter = args
247 .iter()
248 .find(|(k, _)| k.as_str() == "if")
249 .and_then(|(_, v)| {
250 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
251 .and_then(|f| if f.is_empty() { None } else { Some(f) })
252 });
253
254 requests.push(compiler::parser::MetricRequest {
255 function: name.to_string(),
256 of_dimension,
257 select_where_value,
258 condition_filter,
259 });
260 }
261
262 requests
263}
264
265fn extract_requested_fields(
266 ctx: &async_graphql::dynamic::ResolverContext,
267 cube: &CubeDefinition,
268) -> HashSet<String> {
269 let mut fields = HashSet::new();
270 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
271 fields
272}
273
274fn collect_selection_paths(
275 field: &async_graphql::SelectionField<'_>,
276 prefix: &str,
277 out: &mut HashSet<String>,
278 metrics: &[String],
279) {
280 for sub in field.selection_set() {
281 let name = sub.name();
282 if metrics.iter().any(|m| m == name) {
283 continue;
284 }
285 let path = if prefix.is_empty() {
286 name.to_string()
287 } else {
288 format!("{prefix}_{name}")
289 };
290 let has_children = sub.selection_set().next().is_some();
291 if has_children {
292 collect_selection_paths(&sub, &path, out, metrics);
293 } else {
294 out.insert(path);
295 }
296 }
297}
298
299struct CubeTypes {
304 objects: Vec<Object>,
305 inputs: Vec<InputObject>,
306 enums: Vec<Enum>,
307}
308
309fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
310 let record_name = format!("{}Record", cube.name);
311 let filter_name = format!("{}Filter", cube.name);
312 let orderby_name = format!("{}OrderBy", cube.name);
313
314 let mut record_fields: Vec<Field> = Vec::new();
315 let mut filter_fields: Vec<InputValue> = Vec::new();
316 let mut orderby_items: Vec<String> = Vec::new();
317 let mut extra_objects: Vec<Object> = Vec::new();
318 let mut extra_inputs: Vec<InputObject> = Vec::new();
319
320 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
321
322 {
323 let mut collector = DimCollector {
324 cube_name: &cube.name,
325 record_fields: &mut record_fields,
326 filter_fields: &mut filter_fields,
327 orderby_items: &mut orderby_items,
328 extra_objects: &mut extra_objects,
329 extra_inputs: &mut extra_inputs,
330 };
331 for node in &cube.dimensions {
332 collect_dimension_types(node, "", &mut collector);
333 }
334 }
335
336 let flat_dims = cube.flat_dimensions();
337 let mut metric_enums: Vec<Enum> = Vec::new();
338 for metric in &cube.metrics {
339 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
340 extra_inputs.push(
341 InputObject::new(&select_where_name)
342 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
343 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
344 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
345 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
346 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
347 );
348
349 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
350 let mut of_enum = Enum::new(&of_enum_name);
351 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
352 metric_enums.push(of_enum);
353
354 let metric_clone = metric.clone();
355 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
356 let metric_key = metric_clone.clone();
357 FieldFuture::new(async move {
358 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
359 let key = format!("__{metric_key}");
360 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
361 Ok(Some(FieldValue::value(json_to_gql_value(val))))
362 })
363 })
364 .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
365 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
366 .argument(InputValue::new("if", TypeRef::named(&filter_name)));
367
368 record_fields.push(metric_field);
369 }
370
371 let mut record = Object::new(&record_name);
372 for f in record_fields { record = record.field(f); }
373
374 let mut filter = InputObject::new(&filter_name);
375 for f in filter_fields { filter = filter.field(f); }
376
377 let mut orderby = Enum::new(&orderby_name);
378 for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
379
380 let field_enum_name = format!("{}_Field", orderby_name);
382 let orderby_input_name = format!("{}OrderByInput", cube.name);
383 let mut field_enum = Enum::new(&field_enum_name);
384 let flat_dims = cube.flat_dimensions();
385 for (path, _) in &flat_dims {
386 field_enum = field_enum.item(EnumItem::new(path));
387 }
388 let orderby_input = InputObject::new(&orderby_input_name)
389 .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
390 .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
391
392 let mut objects = vec![record]; objects.extend(extra_objects);
393 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
394 let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
395
396 CubeTypes { objects, inputs, enums }
397}
398
399struct DimCollector<'a> {
400 cube_name: &'a str,
401 record_fields: &'a mut Vec<Field>,
402 filter_fields: &'a mut Vec<InputValue>,
403 orderby_items: &'a mut Vec<String>,
404 extra_objects: &'a mut Vec<Object>,
405 extra_inputs: &'a mut Vec<InputObject>,
406}
407
408fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
409 match node {
410 DimensionNode::Leaf(dim) => {
411 let col = dim.column.clone();
412 let is_datetime = dim.dim_type == DimType::DateTime;
413 let leaf_field = Field::new(
414 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
415 move |ctx| {
416 let col = col.clone();
417 FieldFuture::new(async move {
418 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
419 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
420 let gql_val = if is_datetime {
421 json_to_gql_datetime(val)
422 } else {
423 json_to_gql_value(val)
424 };
425 Ok(Some(FieldValue::value(gql_val)))
426 })
427 },
428 );
429 c.record_fields.push(leaf_field);
430 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
431
432 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
433 c.orderby_items.push(format!("{path}_ASC"));
434 c.orderby_items.push(format!("{path}_DESC"));
435 }
436 DimensionNode::Group { graphql_name, children } => {
437 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
438 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
439 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
440
441 let mut child_record_fields: Vec<Field> = Vec::new();
442 let mut child_filter_fields: Vec<InputValue> = Vec::new();
443 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
444
445 let mut child_collector = DimCollector {
446 cube_name: c.cube_name,
447 record_fields: &mut child_record_fields,
448 filter_fields: &mut child_filter_fields,
449 orderby_items: c.orderby_items,
450 extra_objects: c.extra_objects,
451 extra_inputs: c.extra_inputs,
452 };
453 for child in children {
454 collect_dimension_types(child, &new_prefix, &mut child_collector);
455 }
456
457 let mut nested_record = Object::new(&nested_record_name);
458 for f in child_record_fields { nested_record = nested_record.field(f); }
459
460 let mut nested_filter = InputObject::new(&nested_filter_name);
461 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
462
463 let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
464 FieldFuture::new(async move {
465 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
466 Ok(Some(FieldValue::owned_any(row.clone())))
467 })
468 });
469 c.record_fields.push(group_field);
470 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
471 c.extra_objects.push(nested_record);
472 c.extra_inputs.push(nested_filter);
473 }
474 }
475}
476
477fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
478 match dt {
479 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
480 DimType::Int => TypeRef::named(TypeRef::INT),
481 DimType::Float => TypeRef::named(TypeRef::FLOAT),
482 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
483 }
484}
485
486fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
487 match dt {
488 DimType::String => "StringFilter",
489 DimType::Int => "IntFilter",
490 DimType::Float => "FloatFilter",
491 DimType::DateTime => "DateTimeFilter",
492 DimType::Bool => "BoolFilter",
493 }
494}
495
496pub fn json_to_gql_value(v: serde_json::Value) -> Value {
497 match v {
498 serde_json::Value::Null => Value::Null,
499 serde_json::Value::Bool(b) => Value::from(b),
500 serde_json::Value::Number(n) => {
501 if let Some(i) = n.as_i64() { Value::from(i) }
502 else if let Some(f) = n.as_f64() { Value::from(f) }
503 else { Value::from(n.to_string()) }
504 }
505 serde_json::Value::String(s) => Value::from(s),
506 _ => Value::from(v.to_string()),
507 }
508}
509
510fn json_to_gql_datetime(v: serde_json::Value) -> Value {
513 match v {
514 serde_json::Value::String(s) => {
515 let iso = if s.contains('T') {
516 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
517 } else {
518 let replaced = s.replacen(' ', "T", 1);
519 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
520 };
521 Value::from(iso)
522 }
523 other => json_to_gql_value(other),
524 }
525}