1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode, MetricDef};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub type QueryExecutor = Arc<
18 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20 > + Send + Sync,
21>;
22
23pub struct SchemaConfig {
25 pub networks: Vec<String>,
26 pub root_query_name: String,
27 pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33 fn default() -> Self {
34 Self {
35 networks: vec!["sol", "eth", "bsc"]
36 .into_iter().map(String::from).collect(),
37 root_query_name: "ChainStream".to_string(),
38 stats_callback: None,
39 }
40 }
41}
42
43pub fn build_schema(
45 registry: CubeRegistry,
46 dialect: Arc<dyn SqlDialect>,
47 executor: QueryExecutor,
48 config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50 let mut builder = Schema::build("Query", None, None);
51
52 let mut network_enum = Enum::new("Network")
53 .description("Blockchain network to query");
54 for net in &config.networks {
55 network_enum = network_enum.item(EnumItem::new(net));
56 }
57 builder = builder.register(network_enum);
58 builder = builder.register(filter_types::build_limit_input());
59
60 builder = builder.register(
61 InputObject::new("LimitByInput")
62 .description("Limit results per group (similar to ClickHouse LIMIT BY)")
63 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
64 .description("Comma-separated dimension names to group by"))
65 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
66 .description("Maximum rows per group"))
67 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
68 .description("Rows to skip per group")),
69 );
70
71 builder = builder.register(
72 Enum::new("OrderDirection")
73 .description("Sort direction")
74 .item(EnumItem::new("ASC").description("Ascending"))
75 .item(EnumItem::new("DESC").description("Descending")),
76 );
77
78 for input in filter_types::build_filter_primitives() {
79 builder = builder.register(input);
80 }
81
82 let mut query = Object::new("Query");
85
86 for cube in registry.cubes() {
87 let types = build_cube_types(cube);
88 for obj in types.objects { builder = builder.register(obj); }
89 for inp in types.inputs { builder = builder.register(inp); }
90 for en in types.enums { builder = builder.register(en); }
91
92 let cube_name = cube.name.clone();
93 let dialect_clone = dialect.clone();
94 let executor_clone = executor.clone();
95 let stats_cb = config.stats_callback.clone();
96
97 let orderby_list_input_name = format!("{}OrderByInput", cube.name);
98
99 let cube_description = cube.description.clone();
100 let mut field = Field::new(
101 &cube.name,
102 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
103 move |ctx| {
104 let cube_name = cube_name.clone();
105 let dialect = dialect_clone.clone();
106 let executor = executor_clone.clone();
107 let stats_cb = stats_cb.clone();
108 FieldFuture::new(async move {
109 let registry = ctx.ctx.data::<CubeRegistry>()?;
110 let network_val = ctx.args.try_get("network")?;
111 let network = network_val.enum_name()
112 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
113
114 let cube_def = registry.get(&cube_name).ok_or_else(|| {
115 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
116 })?;
117
118 let metric_requests = extract_metric_requests(&ctx, cube_def);
119 let requested = extract_requested_fields(&ctx, cube_def);
120 let mut ir = compiler::parser::parse_cube_query(
121 cube_def,
122 network,
123 &ctx.args,
124 &metric_requests,
125 Some(requested),
126 )?;
127
128 let mut join_idx = 0usize;
130 for sub_field in ctx.ctx.field().selection_set() {
131 let fname = sub_field.name().to_string();
132 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
133 if let Some(jd) = join_def {
134 if let Some(target_cube) = registry.get(&jd.target_cube) {
135 let join_expr = build_join_expr(
136 jd, target_cube, &sub_field, network, join_idx,
137 );
138 ir.joins.push(join_expr);
139 join_idx += 1;
140 }
141 }
142 }
143
144 let validated = compiler::validator::validate(ir)?;
145 let result = dialect.compile(&validated);
146 let sql = result.sql;
147 let bindings = result.bindings;
148
149 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
150 async_graphql::Error::new(format!("Query execution failed: {e}"))
151 })?;
152
153 let rows = if result.alias_remap.is_empty() {
155 rows
156 } else {
157 rows.into_iter().map(|mut row| {
158 for (alias, original) in &result.alias_remap {
159 if let Some(val) = row.shift_remove(alias) {
160 row.entry(original.clone()).or_insert(val);
161 }
162 }
163 row
164 }).collect()
165 };
166
167 let rows: Vec<RowMap> = if validated.joins.is_empty() {
169 rows
170 } else {
171 rows.into_iter().map(|mut row| {
172 for join in &validated.joins {
173 let prefix = format!("{}.", join.alias);
174 let mut sub_row = RowMap::new();
175 let keys: Vec<String> = row.keys()
176 .filter(|k| k.starts_with(&prefix))
177 .cloned()
178 .collect();
179 for key in keys {
180 if let Some(val) = row.shift_remove(&key) {
181 sub_row.insert(key[prefix.len()..].to_string(), val);
182 }
183 }
184 let obj: serde_json::Map<String, serde_json::Value> =
185 sub_row.into_iter().collect();
186 row.insert(
187 join.join_field.clone(),
188 serde_json::Value::Object(obj),
189 );
190 }
191 row
192 }).collect()
193 };
194
195 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
196 .or_else(|| stats_cb.clone());
197 if let Some(cb) = effective_cb {
198 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
199 cb(stats);
200 }
201
202 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
203 Ok(Some(FieldValue::list(values)))
204 })
205 },
206 );
207 if !cube_description.is_empty() {
208 field = field.description(&cube_description);
209 }
210 field = field
211 .argument(InputValue::new("network", TypeRef::named_nn("Network"))
212 .description("Blockchain network to query"))
213 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
214 .description("Filter conditions"))
215 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
216 .description("Pagination control"))
217 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
218 .description("Per-group row limit"))
219 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name)))
220 .description("Sort order (single column)"))
221 .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name))
222 .description("Sort order (multiple columns)"));
223
224 for sel in &cube.selectors {
225 let filter_type = dim_type_to_filter_name(&sel.dim_type);
226 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
227 .description(format!("Shorthand filter for {}", sel.graphql_name)));
228 }
229
230 query = query.field(field);
231 }
232
233 let metadata_registry = Arc::new(registry.clone());
234 let metadata_field = Field::new(
235 "_cubeMetadata",
236 TypeRef::named_nn(TypeRef::STRING),
237 move |_ctx| {
238 let reg = metadata_registry.clone();
239 FieldFuture::new(async move {
240 let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
241 serde_json::json!({
242 "name": cube.name,
243 "description": cube.description,
244 "schema": cube.schema,
245 "tablePattern": cube.table_pattern,
246 "metrics": cube.metrics.iter().map(|m| {
247 let mut obj = serde_json::json!({
248 "name": m.name,
249 "returnType": format!("{:?}", m.return_type),
250 });
251 if let Some(ref tmpl) = m.expression_template {
252 obj["expressionTemplate"] = serde_json::Value::String(tmpl.clone());
253 }
254 if let Some(ref desc) = m.description {
255 obj["description"] = serde_json::Value::String(desc.clone());
256 }
257 obj
258 }).collect::<Vec<_>>(),
259 "selectors": cube.selectors.iter().map(|s| {
260 serde_json::json!({
261 "name": s.graphql_name,
262 "column": s.column,
263 "type": format!("{:?}", s.dim_type),
264 })
265 }).collect::<Vec<_>>(),
266 "dimensions": serialize_dims(&cube.dimensions),
267 "joins": cube.joins.iter().map(|j| {
268 serde_json::json!({
269 "field": j.field_name,
270 "target": j.target_cube,
271 "joinType": format!("{:?}", j.join_type),
272 })
273 }).collect::<Vec<_>>(),
274 "tableRoutes": cube.table_routes.iter().map(|r| {
275 serde_json::json!({
276 "schema": r.schema,
277 "tablePattern": r.table_pattern,
278 "availableColumns": r.available_columns,
279 "priority": r.priority,
280 })
281 }).collect::<Vec<_>>(),
282 "defaultLimit": cube.default_limit,
283 "maxLimit": cube.max_limit,
284 })
285 }).collect();
286 let json = serde_json::to_string(&metadata).unwrap_or_default();
287 Ok(Some(FieldValue::value(Value::from(json))))
288 })
289 },
290 )
291 .description("Internal: returns JSON metadata about all cubes");
292 query = query.field(metadata_field);
293
294 builder = builder.register(query);
295 builder = builder.data(registry);
296
297 builder.finish()
298}
299
300fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
301 serde_json::Value::Array(dims.iter().map(|d| match d {
302 DimensionNode::Leaf(dim) => {
303 let mut obj = serde_json::json!({
304 "name": dim.graphql_name,
305 "column": dim.column,
306 "type": format!("{:?}", dim.dim_type),
307 });
308 if let Some(desc) = &dim.description {
309 obj["description"] = serde_json::Value::String(desc.clone());
310 }
311 obj
312 },
313 DimensionNode::Group { graphql_name, description, children } => {
314 let mut obj = serde_json::json!({
315 "name": graphql_name,
316 "children": serialize_dims(children),
317 });
318 if let Some(desc) = description {
319 obj["description"] = serde_json::Value::String(desc.clone());
320 }
321 obj
322 },
323 }).collect())
324}
325
326fn extract_metric_requests(
330 ctx: &async_graphql::dynamic::ResolverContext,
331 cube: &CubeDefinition,
332) -> Vec<compiler::parser::MetricRequest> {
333 let mut requests = Vec::new();
334
335 for sub_field in ctx.ctx.field().selection_set() {
336 let name = sub_field.name();
337 if !cube.has_metric(name) {
338 continue;
339 }
340
341 let args = match sub_field.arguments() {
342 Ok(args) => args,
343 Err(_) => continue,
344 };
345
346 let of_dimension = args
347 .iter()
348 .find(|(k, _)| k.as_str() == "of")
349 .and_then(|(_, v)| match v {
350 async_graphql::Value::Enum(e) => Some(e.to_string()),
351 async_graphql::Value::String(s) => Some(s.clone()),
352 _ => None,
353 })
354 .unwrap_or_else(|| "*".to_string());
355
356 let select_where_value = args
357 .iter()
358 .find(|(k, _)| k.as_str() == "selectWhere")
359 .map(|(_, v)| v.clone());
360
361 let condition_filter = args
362 .iter()
363 .find(|(k, _)| k.as_str() == "if")
364 .and_then(|(_, v)| {
365 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
366 .and_then(|f| if f.is_empty() { None } else { Some(f) })
367 });
368
369 requests.push(compiler::parser::MetricRequest {
370 function: name.to_string(),
371 of_dimension,
372 select_where_value,
373 condition_filter,
374 });
375 }
376
377 requests
378}
379
380fn extract_requested_fields(
381 ctx: &async_graphql::dynamic::ResolverContext,
382 cube: &CubeDefinition,
383) -> HashSet<String> {
384 let mut fields = HashSet::new();
385 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
386 fields
387}
388
389fn collect_selection_paths(
390 field: &async_graphql::SelectionField<'_>,
391 prefix: &str,
392 out: &mut HashSet<String>,
393 metrics: &[MetricDef],
394) {
395 for sub in field.selection_set() {
396 let name = sub.name();
397 if metrics.iter().any(|m| m.name == name) {
398 continue;
399 }
400 let path = if prefix.is_empty() {
401 name.to_string()
402 } else {
403 format!("{prefix}_{name}")
404 };
405 let has_children = sub.selection_set().next().is_some();
406 if has_children {
407 collect_selection_paths(&sub, &path, out, metrics);
408 } else {
409 out.insert(path);
410 }
411 }
412}
413
414struct CubeTypes {
419 objects: Vec<Object>,
420 inputs: Vec<InputObject>,
421 enums: Vec<Enum>,
422}
423
424fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
425 let record_name = format!("{}Record", cube.name);
426 let filter_name = format!("{}Filter", cube.name);
427 let orderby_name = format!("{}OrderBy", cube.name);
428
429 let mut record_fields: Vec<Field> = Vec::new();
430 let mut filter_fields: Vec<InputValue> = Vec::new();
431 let mut orderby_items: Vec<String> = Vec::new();
432 let mut extra_objects: Vec<Object> = Vec::new();
433 let mut extra_inputs: Vec<InputObject> = Vec::new();
434
435 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
436 .description("OR combinator — matches if any sub-filter matches"));
437
438 {
439 let mut collector = DimCollector {
440 cube_name: &cube.name,
441 record_fields: &mut record_fields,
442 filter_fields: &mut filter_fields,
443 orderby_items: &mut orderby_items,
444 extra_objects: &mut extra_objects,
445 extra_inputs: &mut extra_inputs,
446 };
447 for node in &cube.dimensions {
448 collect_dimension_types(node, "", &mut collector);
449 }
450 }
451
452 let flat_dims = cube.flat_dimensions();
453 let mut metric_enums: Vec<Enum> = Vec::new();
454 let builtin_descs: std::collections::HashMap<&str, &str> = [
455 ("count", "Count of rows or distinct values"),
456 ("sum", "Sum of values"),
457 ("avg", "Average of values"),
458 ("min", "Minimum value"),
459 ("max", "Maximum value"),
460 ("uniq", "Count of unique (distinct) values"),
461 ].into_iter().collect();
462
463 for metric in &cube.metrics {
464 let metric_name = &metric.name;
465 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric_name);
466
467 if metric.supports_where {
468 extra_inputs.push(
469 InputObject::new(&select_where_name)
470 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric_name))
471 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
472 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
473 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
474 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
475 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
476 );
477 }
478
479 let of_enum_name = format!("{}_{}_Of", cube.name, metric_name);
480 let mut of_enum = Enum::new(&of_enum_name)
481 .description(format!("Dimension to apply {} aggregation on", metric_name));
482 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
483 metric_enums.push(of_enum);
484
485 let metric_name_clone = metric_name.clone();
486 let return_type_ref = dim_type_to_typeref(&metric.return_type);
487 let metric_desc = metric.description.as_deref()
488 .or_else(|| builtin_descs.get(metric_name.as_str()).copied())
489 .unwrap_or("Aggregate metric");
490
491 let mut metric_field = Field::new(metric_name, return_type_ref, move |ctx| {
492 let metric_key = metric_name_clone.clone();
493 FieldFuture::new(async move {
494 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
495 let key = format!("__{metric_key}");
496 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
497 Ok(Some(FieldValue::value(json_to_gql_value(val))))
498 })
499 })
500 .description(metric_desc)
501 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
502 .description("Dimension to aggregate on (default: all rows)"));
503
504 if metric.supports_where {
505 metric_field = metric_field
506 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
507 .description("Post-aggregation filter (HAVING)"))
508 .argument(InputValue::new("if", TypeRef::named(&filter_name))
509 .description("Conditional filter for this metric"));
510 }
511
512 record_fields.push(metric_field);
513 }
514
515 for jd in &cube.joins {
517 let target_record_name = format!("{}Record", jd.target_cube);
518 let field_name_owned = jd.field_name.clone();
519 let mut join_field = Field::new(
520 &jd.field_name,
521 TypeRef::named(&target_record_name),
522 move |ctx| {
523 let field_name = field_name_owned.clone();
524 FieldFuture::new(async move {
525 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
526 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
527 let sub_row: RowMap = obj.iter()
528 .map(|(k, v)| (k.clone(), v.clone()))
529 .collect();
530 Ok(Some(FieldValue::owned_any(sub_row)))
531 } else {
532 Ok(Some(FieldValue::value(Value::Null)))
533 }
534 })
535 },
536 );
537 if let Some(desc) = &jd.description {
538 join_field = join_field.description(desc);
539 }
540 record_fields.push(join_field);
541 }
542
543 let mut record = Object::new(&record_name);
544 for f in record_fields { record = record.field(f); }
545
546 let mut filter = InputObject::new(&filter_name)
547 .description(format!("Filter conditions for {} query", cube.name));
548 for f in filter_fields { filter = filter.field(f); }
549
550 let mut orderby = Enum::new(&orderby_name)
551 .description(format!("Sort order for {} results (single column)", cube.name));
552 for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
553
554 let field_enum_name = format!("{}_Field", orderby_name);
556 let orderby_input_name = format!("{}OrderByInput", cube.name);
557 let mut field_enum = Enum::new(&field_enum_name)
558 .description(format!("Available fields for {} multi-column sort", cube.name));
559 let flat_dims = cube.flat_dimensions();
560 for (path, _) in &flat_dims {
561 field_enum = field_enum.item(EnumItem::new(path));
562 }
563 let orderby_input = InputObject::new(&orderby_input_name)
564 .description(format!("Multi-column sort input for {}", cube.name))
565 .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name))
566 .description("Field to sort by"))
567 .field(InputValue::new("direction", TypeRef::named("OrderDirection"))
568 .description("Sort direction (ASC or DESC)"));
569
570 let mut objects = vec![record]; objects.extend(extra_objects);
571 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
572 let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
573
574 CubeTypes { objects, inputs, enums }
575}
576
577struct DimCollector<'a> {
578 cube_name: &'a str,
579 record_fields: &'a mut Vec<Field>,
580 filter_fields: &'a mut Vec<InputValue>,
581 orderby_items: &'a mut Vec<String>,
582 extra_objects: &'a mut Vec<Object>,
583 extra_inputs: &'a mut Vec<InputObject>,
584}
585
586fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
587 match node {
588 DimensionNode::Leaf(dim) => {
589 let col = dim.column.clone();
590 let is_datetime = dim.dim_type == DimType::DateTime;
591 let mut leaf_field = Field::new(
592 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
593 move |ctx| {
594 let col = col.clone();
595 FieldFuture::new(async move {
596 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
597 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
598 let gql_val = if is_datetime {
599 json_to_gql_datetime(val)
600 } else {
601 json_to_gql_value(val)
602 };
603 Ok(Some(FieldValue::value(gql_val)))
604 })
605 },
606 );
607 if let Some(desc) = &dim.description {
608 leaf_field = leaf_field.description(desc);
609 }
610 c.record_fields.push(leaf_field);
611 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
612
613 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
614 c.orderby_items.push(format!("{path}_ASC"));
615 c.orderby_items.push(format!("{path}_DESC"));
616 }
617 DimensionNode::Group { graphql_name, description, children } => {
618 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
619 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
620 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
621
622 let mut child_record_fields: Vec<Field> = Vec::new();
623 let mut child_filter_fields: Vec<InputValue> = Vec::new();
624 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
625
626 let mut child_collector = DimCollector {
627 cube_name: c.cube_name,
628 record_fields: &mut child_record_fields,
629 filter_fields: &mut child_filter_fields,
630 orderby_items: c.orderby_items,
631 extra_objects: c.extra_objects,
632 extra_inputs: c.extra_inputs,
633 };
634 for child in children {
635 collect_dimension_types(child, &new_prefix, &mut child_collector);
636 }
637
638 let mut nested_record = Object::new(&nested_record_name);
639 for f in child_record_fields { nested_record = nested_record.field(f); }
640
641 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
642 let mut nested_filter = InputObject::new(&nested_filter_name)
643 .description(nested_filter_desc);
644 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
645
646 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
647 FieldFuture::new(async move {
648 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
649 Ok(Some(FieldValue::owned_any(row.clone())))
650 })
651 });
652 if let Some(desc) = description {
653 group_field = group_field.description(desc);
654 }
655 c.record_fields.push(group_field);
656 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
657 c.extra_objects.push(nested_record);
658 c.extra_inputs.push(nested_filter);
659 }
660 }
661}
662
663fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
664 match dt {
665 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
666 DimType::Int => TypeRef::named(TypeRef::INT),
667 DimType::Float => TypeRef::named(TypeRef::FLOAT),
668 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
669 }
670}
671
672fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
673 match dt {
674 DimType::String => "StringFilter",
675 DimType::Int => "IntFilter",
676 DimType::Float => "FloatFilter",
677 DimType::DateTime => "DateTimeFilter",
678 DimType::Bool => "BoolFilter",
679 }
680}
681
682pub fn json_to_gql_value(v: serde_json::Value) -> Value {
683 match v {
684 serde_json::Value::Null => Value::Null,
685 serde_json::Value::Bool(b) => Value::from(b),
686 serde_json::Value::Number(n) => {
687 if let Some(i) = n.as_i64() { Value::from(i) }
688 else if let Some(f) = n.as_f64() { Value::from(f) }
689 else { Value::from(n.to_string()) }
690 }
691 serde_json::Value::String(s) => Value::from(s),
692 _ => Value::from(v.to_string()),
693 }
694}
695
696fn build_join_expr(
699 jd: &crate::cube::definition::JoinDef,
700 target_cube: &CubeDefinition,
701 sub_field: &async_graphql::SelectionField<'_>,
702 network: &str,
703 join_idx: usize,
704) -> JoinExpr {
705 let target_flat = target_cube.flat_dimensions();
706 let target_table = target_cube.table_for_chain(network);
707
708 let mut requested_paths = HashSet::new();
709 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
710
711 let mut selects: Vec<SelectExpr> = target_flat.iter()
712 .filter(|(path, _)| requested_paths.contains(path))
713 .map(|(_, dim)| SelectExpr::Column {
714 column: dim.column.clone(),
715 alias: None,
716 })
717 .collect();
718
719 if selects.is_empty() {
720 selects = target_flat.iter()
721 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
722 .collect();
723 }
724
725 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
726
727 let group_by = if is_aggregate {
728 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
729 for sel in &selects {
730 if let SelectExpr::Column { column, .. } = sel {
731 if !column.contains('(') && !gb.contains(column) {
732 gb.push(column.clone());
733 }
734 }
735 }
736 gb
737 } else {
738 vec![]
739 };
740
741 JoinExpr {
742 schema: target_cube.schema.clone(),
743 table: target_table,
744 alias: format!("_j{}", join_idx),
745 conditions: jd.conditions.clone(),
746 selects,
747 group_by,
748 use_final: target_cube.use_final,
749 is_aggregate,
750 target_cube: jd.target_cube.clone(),
751 join_field: sub_field.name().to_string(),
752 join_type: jd.join_type.clone(),
753 }
754}
755
756fn json_to_gql_datetime(v: serde_json::Value) -> Value {
759 match v {
760 serde_json::Value::String(s) => {
761 let iso = if s.contains('T') {
762 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
763 } else {
764 let replaced = s.replacen(' ', "T", 1);
765 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
766 };
767 Value::from(iso)
768 }
769 other => json_to_gql_value(other),
770 }
771}