1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::SqlValue;
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub type QueryExecutor = Arc<
18 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20 > + Send + Sync,
21>;
22
23pub struct SchemaConfig {
25 pub networks: Vec<String>,
26 pub root_query_name: String,
27 pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33 fn default() -> Self {
34 Self {
35 networks: vec!["sol", "eth", "bsc"]
36 .into_iter().map(String::from).collect(),
37 root_query_name: "ChainStream".to_string(),
38 stats_callback: None,
39 }
40 }
41}
42
43pub fn build_schema(
45 registry: CubeRegistry,
46 dialect: Arc<dyn SqlDialect>,
47 executor: QueryExecutor,
48 config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50 let mut builder = Schema::build("Query", None, None);
51
52 let mut network_enum = Enum::new("Network");
54 for net in &config.networks {
55 network_enum = network_enum.item(EnumItem::new(net));
56 }
57 builder = builder.register(network_enum);
58 builder = builder.register(filter_types::build_limit_input());
59
60 builder = builder.register(
61 InputObject::new("LimitByInput")
62 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING)))
63 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT)))
64 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))),
65 );
66
67 builder = builder.register(
69 Enum::new("OrderDirection")
70 .item(EnumItem::new("ASC"))
71 .item(EnumItem::new("DESC")),
72 );
73
74 for input in filter_types::build_filter_primitives() {
75 builder = builder.register(input);
76 }
77
78 let mut query = Object::new("Query");
81
82 for cube in registry.cubes() {
83 let types = build_cube_types(cube);
84 for obj in types.objects { builder = builder.register(obj); }
85 for inp in types.inputs { builder = builder.register(inp); }
86 for en in types.enums { builder = builder.register(en); }
87
88 let cube_name = cube.name.clone();
89 let dialect_clone = dialect.clone();
90 let executor_clone = executor.clone();
91 let stats_cb = config.stats_callback.clone();
92
93 let orderby_list_input_name = format!("{}OrderByInput", cube.name);
94
95 let mut field = Field::new(
96 &cube.name,
97 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
98 move |ctx| {
99 let cube_name = cube_name.clone();
100 let dialect = dialect_clone.clone();
101 let executor = executor_clone.clone();
102 let stats_cb = stats_cb.clone();
103 FieldFuture::new(async move {
104 let registry = ctx.ctx.data::<CubeRegistry>()?;
105 let network_val = ctx.args.try_get("network")?;
106 let network = network_val.enum_name()
107 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
108
109 let cube_def = registry.get(&cube_name).ok_or_else(|| {
110 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
111 })?;
112
113 let metric_requests = extract_metric_requests(&ctx, cube_def);
114 let requested = extract_requested_fields(&ctx, cube_def);
115 let ir = compiler::parser::parse_cube_query(
116 cube_def,
117 network,
118 &ctx.args,
119 &metric_requests,
120 Some(requested),
121 )?;
122 let validated = compiler::validator::validate(ir)?;
123 let (sql, bindings) = dialect.compile(&validated);
124
125 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
126 async_graphql::Error::new(format!("Query execution failed: {e}"))
127 })?;
128
129 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
130 .or_else(|| stats_cb.clone());
131 if let Some(cb) = effective_cb {
132 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
133 cb(stats);
134 }
135
136 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
137 Ok(Some(FieldValue::list(values)))
138 })
139 },
140 );
141 field = field
142 .argument(InputValue::new("network", TypeRef::named_nn("Network")))
143 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
144 .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
145 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput")))
146 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))))
147 .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name)));
148
149 for sel in &cube.selectors {
150 let filter_type = dim_type_to_filter_name(&sel.dim_type);
151 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
152 }
153
154 query = query.field(field);
155 }
156
157 let metadata_registry = Arc::new(registry.clone());
159 let metadata_field = Field::new(
160 "_cubeMetadata",
161 TypeRef::named_nn(TypeRef::STRING),
162 move |_ctx| {
163 let reg = metadata_registry.clone();
164 FieldFuture::new(async move {
165 let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
166 serde_json::json!({
167 "name": cube.name,
168 "schema": cube.schema,
169 "tablePattern": cube.table_pattern,
170 "metrics": cube.metrics,
171 "selectors": cube.selectors.iter().map(|s| {
172 serde_json::json!({
173 "name": s.graphql_name,
174 "column": s.column,
175 "type": format!("{:?}", s.dim_type),
176 })
177 }).collect::<Vec<_>>(),
178 "dimensions": serialize_dims(&cube.dimensions),
179 "defaultLimit": cube.default_limit,
180 "maxLimit": cube.max_limit,
181 })
182 }).collect();
183 let json = serde_json::to_string(&metadata).unwrap_or_default();
184 Ok(Some(FieldValue::value(Value::from(json))))
185 })
186 },
187 );
188 query = query.field(metadata_field);
189
190 builder = builder.register(query);
191 builder = builder.data(registry);
192
193 builder.finish()
194}
195
196fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
197 serde_json::Value::Array(dims.iter().map(|d| match d {
198 DimensionNode::Leaf(dim) => serde_json::json!({
199 "name": dim.graphql_name,
200 "column": dim.column,
201 "type": format!("{:?}", dim.dim_type),
202 }),
203 DimensionNode::Group { graphql_name, children } => serde_json::json!({
204 "name": graphql_name,
205 "children": serialize_dims(children),
206 }),
207 }).collect())
208}
209
210fn extract_metric_requests(
214 ctx: &async_graphql::dynamic::ResolverContext,
215 cube: &CubeDefinition,
216) -> Vec<compiler::parser::MetricRequest> {
217 let mut requests = Vec::new();
218
219 for sub_field in ctx.ctx.field().selection_set() {
220 let name = sub_field.name();
221 if !cube.metrics.contains(&name.to_string()) {
222 continue;
223 }
224
225 let args = match sub_field.arguments() {
226 Ok(args) => args,
227 Err(_) => continue,
228 };
229
230 let of_dimension = args
231 .iter()
232 .find(|(k, _)| k.as_str() == "of")
233 .and_then(|(_, v)| match v {
234 async_graphql::Value::Enum(e) => Some(e.to_string()),
235 async_graphql::Value::String(s) => Some(s.clone()),
236 _ => None,
237 })
238 .unwrap_or_else(|| "*".to_string());
239
240 let select_where_value = args
241 .iter()
242 .find(|(k, _)| k.as_str() == "selectWhere")
243 .map(|(_, v)| v.clone());
244
245 let condition_filter = args
246 .iter()
247 .find(|(k, _)| k.as_str() == "if")
248 .and_then(|(_, v)| {
249 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
250 .and_then(|f| if f.is_empty() { None } else { Some(f) })
251 });
252
253 requests.push(compiler::parser::MetricRequest {
254 function: name.to_string(),
255 of_dimension,
256 select_where_value,
257 condition_filter,
258 });
259 }
260
261 requests
262}
263
264fn extract_requested_fields(
265 ctx: &async_graphql::dynamic::ResolverContext,
266 cube: &CubeDefinition,
267) -> HashSet<String> {
268 let mut fields = HashSet::new();
269 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
270 fields
271}
272
273fn collect_selection_paths(
274 field: &async_graphql::SelectionField<'_>,
275 prefix: &str,
276 out: &mut HashSet<String>,
277 metrics: &[String],
278) {
279 for sub in field.selection_set() {
280 let name = sub.name();
281 if metrics.iter().any(|m| m == name) {
282 continue;
283 }
284 let path = if prefix.is_empty() {
285 name.to_string()
286 } else {
287 format!("{prefix}_{name}")
288 };
289 let has_children = sub.selection_set().next().is_some();
290 if has_children {
291 collect_selection_paths(&sub, &path, out, metrics);
292 } else {
293 out.insert(path);
294 }
295 }
296}
297
298struct CubeTypes {
303 objects: Vec<Object>,
304 inputs: Vec<InputObject>,
305 enums: Vec<Enum>,
306}
307
308fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
309 let record_name = format!("{}Record", cube.name);
310 let filter_name = format!("{}Filter", cube.name);
311 let orderby_name = format!("{}OrderBy", cube.name);
312
313 let mut record_fields: Vec<Field> = Vec::new();
314 let mut filter_fields: Vec<InputValue> = Vec::new();
315 let mut orderby_items: Vec<String> = Vec::new();
316 let mut extra_objects: Vec<Object> = Vec::new();
317 let mut extra_inputs: Vec<InputObject> = Vec::new();
318
319 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
320
321 {
322 let mut collector = DimCollector {
323 cube_name: &cube.name,
324 record_fields: &mut record_fields,
325 filter_fields: &mut filter_fields,
326 orderby_items: &mut orderby_items,
327 extra_objects: &mut extra_objects,
328 extra_inputs: &mut extra_inputs,
329 };
330 for node in &cube.dimensions {
331 collect_dimension_types(node, "", &mut collector);
332 }
333 }
334
335 let flat_dims = cube.flat_dimensions();
336 let mut metric_enums: Vec<Enum> = Vec::new();
337 for metric in &cube.metrics {
338 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
339 extra_inputs.push(
340 InputObject::new(&select_where_name)
341 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
342 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
343 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
344 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
345 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
346 );
347
348 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
349 let mut of_enum = Enum::new(&of_enum_name);
350 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
351 metric_enums.push(of_enum);
352
353 let metric_clone = metric.clone();
354 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
355 let metric_key = metric_clone.clone();
356 FieldFuture::new(async move {
357 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
358 let key = format!("__{metric_key}");
359 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
360 Ok(Some(FieldValue::value(json_to_gql_value(val))))
361 })
362 })
363 .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
364 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)))
365 .argument(InputValue::new("if", TypeRef::named(&filter_name)));
366
367 record_fields.push(metric_field);
368 }
369
370 let mut record = Object::new(&record_name);
371 for f in record_fields { record = record.field(f); }
372
373 let mut filter = InputObject::new(&filter_name);
374 for f in filter_fields { filter = filter.field(f); }
375
376 let mut orderby = Enum::new(&orderby_name);
377 for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
378
379 let field_enum_name = format!("{}_Field", orderby_name);
381 let orderby_input_name = format!("{}OrderByInput", cube.name);
382 let mut field_enum = Enum::new(&field_enum_name);
383 let flat_dims = cube.flat_dimensions();
384 for (path, _) in &flat_dims {
385 field_enum = field_enum.item(EnumItem::new(path));
386 }
387 let orderby_input = InputObject::new(&orderby_input_name)
388 .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name)))
389 .field(InputValue::new("direction", TypeRef::named("OrderDirection")));
390
391 let mut objects = vec![record]; objects.extend(extra_objects);
392 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
393 let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
394
395 CubeTypes { objects, inputs, enums }
396}
397
398struct DimCollector<'a> {
399 cube_name: &'a str,
400 record_fields: &'a mut Vec<Field>,
401 filter_fields: &'a mut Vec<InputValue>,
402 orderby_items: &'a mut Vec<String>,
403 extra_objects: &'a mut Vec<Object>,
404 extra_inputs: &'a mut Vec<InputObject>,
405}
406
407fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
408 match node {
409 DimensionNode::Leaf(dim) => {
410 let col = dim.column.clone();
411 let is_datetime = dim.dim_type == DimType::DateTime;
412 let leaf_field = Field::new(
413 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
414 move |ctx| {
415 let col = col.clone();
416 FieldFuture::new(async move {
417 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
418 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
419 let gql_val = if is_datetime {
420 json_to_gql_datetime(val)
421 } else {
422 json_to_gql_value(val)
423 };
424 Ok(Some(FieldValue::value(gql_val)))
425 })
426 },
427 );
428 c.record_fields.push(leaf_field);
429 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
430
431 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
432 c.orderby_items.push(format!("{path}_ASC"));
433 c.orderby_items.push(format!("{path}_DESC"));
434 }
435 DimensionNode::Group { graphql_name, children } => {
436 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
437 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
438 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
439
440 let mut child_record_fields: Vec<Field> = Vec::new();
441 let mut child_filter_fields: Vec<InputValue> = Vec::new();
442 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
443
444 let mut child_collector = DimCollector {
445 cube_name: c.cube_name,
446 record_fields: &mut child_record_fields,
447 filter_fields: &mut child_filter_fields,
448 orderby_items: c.orderby_items,
449 extra_objects: c.extra_objects,
450 extra_inputs: c.extra_inputs,
451 };
452 for child in children {
453 collect_dimension_types(child, &new_prefix, &mut child_collector);
454 }
455
456 let mut nested_record = Object::new(&nested_record_name);
457 for f in child_record_fields { nested_record = nested_record.field(f); }
458
459 let mut nested_filter = InputObject::new(&nested_filter_name);
460 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
461
462 let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
463 FieldFuture::new(async move {
464 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
465 Ok(Some(FieldValue::owned_any(row.clone())))
466 })
467 });
468 c.record_fields.push(group_field);
469 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
470 c.extra_objects.push(nested_record);
471 c.extra_inputs.push(nested_filter);
472 }
473 }
474}
475
476fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
477 match dt {
478 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
479 DimType::Int => TypeRef::named(TypeRef::INT),
480 DimType::Float => TypeRef::named(TypeRef::FLOAT),
481 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
482 }
483}
484
485fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
486 match dt {
487 DimType::String => "StringFilter",
488 DimType::Int => "IntFilter",
489 DimType::Float => "FloatFilter",
490 DimType::DateTime => "DateTimeFilter",
491 DimType::Bool => "BoolFilter",
492 }
493}
494
495pub fn json_to_gql_value(v: serde_json::Value) -> Value {
496 match v {
497 serde_json::Value::Null => Value::Null,
498 serde_json::Value::Bool(b) => Value::from(b),
499 serde_json::Value::Number(n) => {
500 if let Some(i) = n.as_i64() { Value::from(i) }
501 else if let Some(f) = n.as_f64() { Value::from(f) }
502 else { Value::from(n.to_string()) }
503 }
504 serde_json::Value::String(s) => Value::from(s),
505 _ => Value::from(v.to_string()),
506 }
507}
508
509fn json_to_gql_datetime(v: serde_json::Value) -> Value {
512 match v {
513 serde_json::Value::String(s) => {
514 let iso = if s.contains('T') {
515 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
516 } else {
517 let replaced = s.replacen(' ', "T", 1);
518 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
519 };
520 Value::from(iso)
521 }
522 other => json_to_gql_value(other),
523 }
524}