1use std::collections::HashSet;
2use std::sync::Arc;
3use async_graphql::dynamic::*;
4use async_graphql::Value;
5
6use crate::compiler;
7use crate::compiler::ir::{SqlValue, JoinExpr, SelectExpr};
8use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
9use crate::cube::registry::CubeRegistry;
10use crate::response::RowMap;
11use crate::schema::filter_types;
12use crate::sql::dialect::SqlDialect;
13use crate::stats::{QueryStats, StatsCallback};
14
15pub type QueryExecutor = Arc<
18 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
19 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
20 > + Send + Sync,
21>;
22
23pub struct SchemaConfig {
25 pub networks: Vec<String>,
26 pub root_query_name: String,
27 pub stats_callback: Option<StatsCallback>,
30}
31
32impl Default for SchemaConfig {
33 fn default() -> Self {
34 Self {
35 networks: vec!["sol", "eth", "bsc"]
36 .into_iter().map(String::from).collect(),
37 root_query_name: "ChainStream".to_string(),
38 stats_callback: None,
39 }
40 }
41}
42
43pub fn build_schema(
45 registry: CubeRegistry,
46 dialect: Arc<dyn SqlDialect>,
47 executor: QueryExecutor,
48 config: SchemaConfig,
49) -> Result<Schema, SchemaError> {
50 let mut builder = Schema::build("Query", None, None);
51
52 let mut network_enum = Enum::new("Network")
53 .description("Blockchain network to query");
54 for net in &config.networks {
55 network_enum = network_enum.item(EnumItem::new(net));
56 }
57 builder = builder.register(network_enum);
58 builder = builder.register(filter_types::build_limit_input());
59
60 builder = builder.register(
61 InputObject::new("LimitByInput")
62 .description("Limit results per group (similar to ClickHouse LIMIT BY)")
63 .field(InputValue::new("by", TypeRef::named_nn(TypeRef::STRING))
64 .description("Comma-separated dimension names to group by"))
65 .field(InputValue::new("count", TypeRef::named_nn(TypeRef::INT))
66 .description("Maximum rows per group"))
67 .field(InputValue::new("offset", TypeRef::named(TypeRef::INT))
68 .description("Rows to skip per group")),
69 );
70
71 builder = builder.register(
72 Enum::new("OrderDirection")
73 .description("Sort direction")
74 .item(EnumItem::new("ASC").description("Ascending"))
75 .item(EnumItem::new("DESC").description("Descending")),
76 );
77
78 for input in filter_types::build_filter_primitives() {
79 builder = builder.register(input);
80 }
81
82 let mut query = Object::new("Query");
85
86 for cube in registry.cubes() {
87 let types = build_cube_types(cube);
88 for obj in types.objects { builder = builder.register(obj); }
89 for inp in types.inputs { builder = builder.register(inp); }
90 for en in types.enums { builder = builder.register(en); }
91
92 let cube_name = cube.name.clone();
93 let dialect_clone = dialect.clone();
94 let executor_clone = executor.clone();
95 let stats_cb = config.stats_callback.clone();
96
97 let orderby_list_input_name = format!("{}OrderByInput", cube.name);
98
99 let cube_description = cube.description.clone();
100 let mut field = Field::new(
101 &cube.name,
102 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
103 move |ctx| {
104 let cube_name = cube_name.clone();
105 let dialect = dialect_clone.clone();
106 let executor = executor_clone.clone();
107 let stats_cb = stats_cb.clone();
108 FieldFuture::new(async move {
109 let registry = ctx.ctx.data::<CubeRegistry>()?;
110 let network_val = ctx.args.try_get("network")?;
111 let network = network_val.enum_name()
112 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
113
114 let cube_def = registry.get(&cube_name).ok_or_else(|| {
115 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
116 })?;
117
118 let metric_requests = extract_metric_requests(&ctx, cube_def);
119 let requested = extract_requested_fields(&ctx, cube_def);
120 let mut ir = compiler::parser::parse_cube_query(
121 cube_def,
122 network,
123 &ctx.args,
124 &metric_requests,
125 Some(requested),
126 )?;
127
128 let mut join_idx = 0usize;
130 for sub_field in ctx.ctx.field().selection_set() {
131 let fname = sub_field.name().to_string();
132 let join_def = cube_def.joins.iter().find(|j| j.field_name == fname);
133 if let Some(jd) = join_def {
134 if let Some(target_cube) = registry.get(&jd.target_cube) {
135 let join_expr = build_join_expr(
136 jd, target_cube, &sub_field, network, join_idx,
137 );
138 ir.joins.push(join_expr);
139 join_idx += 1;
140 }
141 }
142 }
143
144 let validated = compiler::validator::validate(ir)?;
145 let result = dialect.compile(&validated);
146 let sql = result.sql;
147 let bindings = result.bindings;
148
149 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
150 async_graphql::Error::new(format!("Query execution failed: {e}"))
151 })?;
152
153 let rows = if result.alias_remap.is_empty() {
155 rows
156 } else {
157 rows.into_iter().map(|mut row| {
158 for (alias, original) in &result.alias_remap {
159 if let Some(val) = row.shift_remove(alias) {
160 row.entry(original.clone()).or_insert(val);
161 }
162 }
163 row
164 }).collect()
165 };
166
167 let rows: Vec<RowMap> = if validated.joins.is_empty() {
169 rows
170 } else {
171 rows.into_iter().map(|mut row| {
172 for join in &validated.joins {
173 let prefix = format!("{}.", join.alias);
174 let mut sub_row = RowMap::new();
175 let keys: Vec<String> = row.keys()
176 .filter(|k| k.starts_with(&prefix))
177 .cloned()
178 .collect();
179 for key in keys {
180 if let Some(val) = row.shift_remove(&key) {
181 sub_row.insert(key[prefix.len()..].to_string(), val);
182 }
183 }
184 let obj: serde_json::Map<String, serde_json::Value> =
185 sub_row.into_iter().collect();
186 row.insert(
187 join.join_field.clone(),
188 serde_json::Value::Object(obj),
189 );
190 }
191 row
192 }).collect()
193 };
194
195 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
196 .or_else(|| stats_cb.clone());
197 if let Some(cb) = effective_cb {
198 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
199 cb(stats);
200 }
201
202 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
203 Ok(Some(FieldValue::list(values)))
204 })
205 },
206 );
207 if !cube_description.is_empty() {
208 field = field.description(&cube_description);
209 }
210 field = field
211 .argument(InputValue::new("network", TypeRef::named_nn("Network"))
212 .description("Blockchain network to query"))
213 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name)))
214 .description("Filter conditions"))
215 .argument(InputValue::new("limit", TypeRef::named("LimitInput"))
216 .description("Pagination control"))
217 .argument(InputValue::new("limitBy", TypeRef::named("LimitByInput"))
218 .description("Per-group row limit"))
219 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name)))
220 .description("Sort order (single column)"))
221 .argument(InputValue::new("orderByList", TypeRef::named_list(&orderby_list_input_name))
222 .description("Sort order (multiple columns)"));
223
224 for sel in &cube.selectors {
225 let filter_type = dim_type_to_filter_name(&sel.dim_type);
226 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type))
227 .description(format!("Shorthand filter for {}", sel.graphql_name)));
228 }
229
230 query = query.field(field);
231 }
232
233 let metadata_registry = Arc::new(registry.clone());
234 let metadata_field = Field::new(
235 "_cubeMetadata",
236 TypeRef::named_nn(TypeRef::STRING),
237 move |_ctx| {
238 let reg = metadata_registry.clone();
239 FieldFuture::new(async move {
240 let metadata: Vec<serde_json::Value> = reg.cubes().map(|cube| {
241 serde_json::json!({
242 "name": cube.name,
243 "description": cube.description,
244 "schema": cube.schema,
245 "tablePattern": cube.table_pattern,
246 "metrics": cube.metrics,
247 "selectors": cube.selectors.iter().map(|s| {
248 serde_json::json!({
249 "name": s.graphql_name,
250 "column": s.column,
251 "type": format!("{:?}", s.dim_type),
252 })
253 }).collect::<Vec<_>>(),
254 "dimensions": serialize_dims(&cube.dimensions),
255 "joins": cube.joins.iter().map(|j| {
256 serde_json::json!({
257 "field": j.field_name,
258 "target": j.target_cube,
259 })
260 }).collect::<Vec<_>>(),
261 "defaultLimit": cube.default_limit,
262 "maxLimit": cube.max_limit,
263 })
264 }).collect();
265 let json = serde_json::to_string(&metadata).unwrap_or_default();
266 Ok(Some(FieldValue::value(Value::from(json))))
267 })
268 },
269 )
270 .description("Internal: returns JSON metadata about all cubes");
271 query = query.field(metadata_field);
272
273 builder = builder.register(query);
274 builder = builder.data(registry);
275
276 builder.finish()
277}
278
279fn serialize_dims(dims: &[DimensionNode]) -> serde_json::Value {
280 serde_json::Value::Array(dims.iter().map(|d| match d {
281 DimensionNode::Leaf(dim) => {
282 let mut obj = serde_json::json!({
283 "name": dim.graphql_name,
284 "column": dim.column,
285 "type": format!("{:?}", dim.dim_type),
286 });
287 if let Some(desc) = &dim.description {
288 obj["description"] = serde_json::Value::String(desc.clone());
289 }
290 obj
291 },
292 DimensionNode::Group { graphql_name, description, children } => {
293 let mut obj = serde_json::json!({
294 "name": graphql_name,
295 "children": serialize_dims(children),
296 });
297 if let Some(desc) = description {
298 obj["description"] = serde_json::Value::String(desc.clone());
299 }
300 obj
301 },
302 }).collect())
303}
304
305fn extract_metric_requests(
309 ctx: &async_graphql::dynamic::ResolverContext,
310 cube: &CubeDefinition,
311) -> Vec<compiler::parser::MetricRequest> {
312 let mut requests = Vec::new();
313
314 for sub_field in ctx.ctx.field().selection_set() {
315 let name = sub_field.name();
316 if !cube.metrics.contains(&name.to_string()) {
317 continue;
318 }
319
320 let args = match sub_field.arguments() {
321 Ok(args) => args,
322 Err(_) => continue,
323 };
324
325 let of_dimension = args
326 .iter()
327 .find(|(k, _)| k.as_str() == "of")
328 .and_then(|(_, v)| match v {
329 async_graphql::Value::Enum(e) => Some(e.to_string()),
330 async_graphql::Value::String(s) => Some(s.clone()),
331 _ => None,
332 })
333 .unwrap_or_else(|| "*".to_string());
334
335 let select_where_value = args
336 .iter()
337 .find(|(k, _)| k.as_str() == "selectWhere")
338 .map(|(_, v)| v.clone());
339
340 let condition_filter = args
341 .iter()
342 .find(|(k, _)| k.as_str() == "if")
343 .and_then(|(_, v)| {
344 compiler::filter::parse_filter_from_value(v, &cube.dimensions).ok()
345 .and_then(|f| if f.is_empty() { None } else { Some(f) })
346 });
347
348 requests.push(compiler::parser::MetricRequest {
349 function: name.to_string(),
350 of_dimension,
351 select_where_value,
352 condition_filter,
353 });
354 }
355
356 requests
357}
358
359fn extract_requested_fields(
360 ctx: &async_graphql::dynamic::ResolverContext,
361 cube: &CubeDefinition,
362) -> HashSet<String> {
363 let mut fields = HashSet::new();
364 collect_selection_paths(&ctx.ctx.field(), "", &mut fields, &cube.metrics);
365 fields
366}
367
368fn collect_selection_paths(
369 field: &async_graphql::SelectionField<'_>,
370 prefix: &str,
371 out: &mut HashSet<String>,
372 metrics: &[String],
373) {
374 for sub in field.selection_set() {
375 let name = sub.name();
376 if metrics.iter().any(|m| m == name) {
377 continue;
378 }
379 let path = if prefix.is_empty() {
380 name.to_string()
381 } else {
382 format!("{prefix}_{name}")
383 };
384 let has_children = sub.selection_set().next().is_some();
385 if has_children {
386 collect_selection_paths(&sub, &path, out, metrics);
387 } else {
388 out.insert(path);
389 }
390 }
391}
392
393struct CubeTypes {
398 objects: Vec<Object>,
399 inputs: Vec<InputObject>,
400 enums: Vec<Enum>,
401}
402
403fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
404 let record_name = format!("{}Record", cube.name);
405 let filter_name = format!("{}Filter", cube.name);
406 let orderby_name = format!("{}OrderBy", cube.name);
407
408 let mut record_fields: Vec<Field> = Vec::new();
409 let mut filter_fields: Vec<InputValue> = Vec::new();
410 let mut orderby_items: Vec<String> = Vec::new();
411 let mut extra_objects: Vec<Object> = Vec::new();
412 let mut extra_inputs: Vec<InputObject> = Vec::new();
413
414 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name))
415 .description("OR combinator — matches if any sub-filter matches"));
416
417 {
418 let mut collector = DimCollector {
419 cube_name: &cube.name,
420 record_fields: &mut record_fields,
421 filter_fields: &mut filter_fields,
422 orderby_items: &mut orderby_items,
423 extra_objects: &mut extra_objects,
424 extra_inputs: &mut extra_inputs,
425 };
426 for node in &cube.dimensions {
427 collect_dimension_types(node, "", &mut collector);
428 }
429 }
430
431 let flat_dims = cube.flat_dimensions();
432 let mut metric_enums: Vec<Enum> = Vec::new();
433 let metric_descriptions: std::collections::HashMap<&str, &str> = [
434 ("count", "Count of rows or distinct values"),
435 ("sum", "Sum of values"),
436 ("avg", "Average of values"),
437 ("min", "Minimum value"),
438 ("max", "Maximum value"),
439 ("uniq", "Count of unique (distinct) values"),
440 ].into_iter().collect();
441
442 for metric in &cube.metrics {
443 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
444 extra_inputs.push(
445 InputObject::new(&select_where_name)
446 .description(format!("Post-aggregation filter for {} (HAVING clause)", metric))
447 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)).description("Greater than"))
448 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)).description("Greater than or equal to"))
449 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)).description("Less than"))
450 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)).description("Less than or equal to"))
451 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING)).description("Equal to")),
452 );
453
454 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
455 let mut of_enum = Enum::new(&of_enum_name)
456 .description(format!("Dimension to apply {} aggregation on", metric));
457 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
458 metric_enums.push(of_enum);
459
460 let metric_clone = metric.clone();
461 let metric_desc = metric_descriptions.get(metric.as_str())
462 .copied()
463 .unwrap_or("Aggregate metric");
464 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
465 let metric_key = metric_clone.clone();
466 FieldFuture::new(async move {
467 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
468 let key = format!("__{metric_key}");
469 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
470 Ok(Some(FieldValue::value(json_to_gql_value(val))))
471 })
472 })
473 .description(metric_desc)
474 .argument(InputValue::new("of", TypeRef::named(&of_enum_name))
475 .description("Dimension to aggregate on (default: all rows)"))
476 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name))
477 .description("Post-aggregation filter (HAVING)"))
478 .argument(InputValue::new("if", TypeRef::named(&filter_name))
479 .description("Conditional filter for this metric"));
480
481 record_fields.push(metric_field);
482 }
483
484 for jd in &cube.joins {
486 let target_record_name = format!("{}Record", jd.target_cube);
487 let field_name_owned = jd.field_name.clone();
488 let mut join_field = Field::new(
489 &jd.field_name,
490 TypeRef::named(&target_record_name),
491 move |ctx| {
492 let field_name = field_name_owned.clone();
493 FieldFuture::new(async move {
494 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
495 if let Some(serde_json::Value::Object(obj)) = row.get(&field_name) {
496 let sub_row: RowMap = obj.iter()
497 .map(|(k, v)| (k.clone(), v.clone()))
498 .collect();
499 Ok(Some(FieldValue::owned_any(sub_row)))
500 } else {
501 Ok(Some(FieldValue::value(Value::Null)))
502 }
503 })
504 },
505 );
506 if let Some(desc) = &jd.description {
507 join_field = join_field.description(desc);
508 }
509 record_fields.push(join_field);
510 }
511
512 let mut record = Object::new(&record_name);
513 for f in record_fields { record = record.field(f); }
514
515 let mut filter = InputObject::new(&filter_name)
516 .description(format!("Filter conditions for {} query", cube.name));
517 for f in filter_fields { filter = filter.field(f); }
518
519 let mut orderby = Enum::new(&orderby_name)
520 .description(format!("Sort order for {} results (single column)", cube.name));
521 for item in &orderby_items { orderby = orderby.item(EnumItem::new(item)); }
522
523 let field_enum_name = format!("{}_Field", orderby_name);
525 let orderby_input_name = format!("{}OrderByInput", cube.name);
526 let mut field_enum = Enum::new(&field_enum_name)
527 .description(format!("Available fields for {} multi-column sort", cube.name));
528 let flat_dims = cube.flat_dimensions();
529 for (path, _) in &flat_dims {
530 field_enum = field_enum.item(EnumItem::new(path));
531 }
532 let orderby_input = InputObject::new(&orderby_input_name)
533 .description(format!("Multi-column sort input for {}", cube.name))
534 .field(InputValue::new("field", TypeRef::named_nn(&field_enum_name))
535 .description("Field to sort by"))
536 .field(InputValue::new("direction", TypeRef::named("OrderDirection"))
537 .description("Sort direction (ASC or DESC)"));
538
539 let mut objects = vec![record]; objects.extend(extra_objects);
540 let mut inputs = vec![filter, orderby_input]; inputs.extend(extra_inputs);
541 let mut enums = vec![orderby, field_enum]; enums.extend(metric_enums);
542
543 CubeTypes { objects, inputs, enums }
544}
545
546struct DimCollector<'a> {
547 cube_name: &'a str,
548 record_fields: &'a mut Vec<Field>,
549 filter_fields: &'a mut Vec<InputValue>,
550 orderby_items: &'a mut Vec<String>,
551 extra_objects: &'a mut Vec<Object>,
552 extra_inputs: &'a mut Vec<InputObject>,
553}
554
555fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
556 match node {
557 DimensionNode::Leaf(dim) => {
558 let col = dim.column.clone();
559 let is_datetime = dim.dim_type == DimType::DateTime;
560 let mut leaf_field = Field::new(
561 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
562 move |ctx| {
563 let col = col.clone();
564 FieldFuture::new(async move {
565 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
566 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
567 let gql_val = if is_datetime {
568 json_to_gql_datetime(val)
569 } else {
570 json_to_gql_value(val)
571 };
572 Ok(Some(FieldValue::value(gql_val)))
573 })
574 },
575 );
576 if let Some(desc) = &dim.description {
577 leaf_field = leaf_field.description(desc);
578 }
579 c.record_fields.push(leaf_field);
580 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
581
582 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
583 c.orderby_items.push(format!("{path}_ASC"));
584 c.orderby_items.push(format!("{path}_DESC"));
585 }
586 DimensionNode::Group { graphql_name, description, children } => {
587 let full_path = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
588 let nested_record_name = format!("{}_{full_path}_Record", c.cube_name);
589 let nested_filter_name = format!("{}_{full_path}_Filter", c.cube_name);
590
591 let mut child_record_fields: Vec<Field> = Vec::new();
592 let mut child_filter_fields: Vec<InputValue> = Vec::new();
593 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
594
595 let mut child_collector = DimCollector {
596 cube_name: c.cube_name,
597 record_fields: &mut child_record_fields,
598 filter_fields: &mut child_filter_fields,
599 orderby_items: c.orderby_items,
600 extra_objects: c.extra_objects,
601 extra_inputs: c.extra_inputs,
602 };
603 for child in children {
604 collect_dimension_types(child, &new_prefix, &mut child_collector);
605 }
606
607 let mut nested_record = Object::new(&nested_record_name);
608 for f in child_record_fields { nested_record = nested_record.field(f); }
609
610 let nested_filter_desc = format!("Filter conditions for {}", graphql_name);
611 let mut nested_filter = InputObject::new(&nested_filter_name)
612 .description(nested_filter_desc);
613 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
614
615 let mut group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
616 FieldFuture::new(async move {
617 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
618 Ok(Some(FieldValue::owned_any(row.clone())))
619 })
620 });
621 if let Some(desc) = description {
622 group_field = group_field.description(desc);
623 }
624 c.record_fields.push(group_field);
625 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
626 c.extra_objects.push(nested_record);
627 c.extra_inputs.push(nested_filter);
628 }
629 }
630}
631
632fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
633 match dt {
634 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
635 DimType::Int => TypeRef::named(TypeRef::INT),
636 DimType::Float => TypeRef::named(TypeRef::FLOAT),
637 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
638 }
639}
640
641fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
642 match dt {
643 DimType::String => "StringFilter",
644 DimType::Int => "IntFilter",
645 DimType::Float => "FloatFilter",
646 DimType::DateTime => "DateTimeFilter",
647 DimType::Bool => "BoolFilter",
648 }
649}
650
651pub fn json_to_gql_value(v: serde_json::Value) -> Value {
652 match v {
653 serde_json::Value::Null => Value::Null,
654 serde_json::Value::Bool(b) => Value::from(b),
655 serde_json::Value::Number(n) => {
656 if let Some(i) = n.as_i64() { Value::from(i) }
657 else if let Some(f) = n.as_f64() { Value::from(f) }
658 else { Value::from(n.to_string()) }
659 }
660 serde_json::Value::String(s) => Value::from(s),
661 _ => Value::from(v.to_string()),
662 }
663}
664
665fn build_join_expr(
668 jd: &crate::cube::definition::JoinDef,
669 target_cube: &CubeDefinition,
670 sub_field: &async_graphql::SelectionField<'_>,
671 network: &str,
672 join_idx: usize,
673) -> JoinExpr {
674 let target_flat = target_cube.flat_dimensions();
675 let target_table = target_cube.table_for_chain(network);
676
677 let mut requested_paths = HashSet::new();
678 collect_selection_paths(sub_field, "", &mut requested_paths, &target_cube.metrics);
679
680 let mut selects: Vec<SelectExpr> = target_flat.iter()
681 .filter(|(path, _)| requested_paths.contains(path))
682 .map(|(_, dim)| SelectExpr::Column {
683 column: dim.column.clone(),
684 alias: None,
685 })
686 .collect();
687
688 if selects.is_empty() {
689 selects = target_flat.iter()
690 .map(|(_, dim)| SelectExpr::Column { column: dim.column.clone(), alias: None })
691 .collect();
692 }
693
694 let is_aggregate = target_flat.iter().any(|(_, dim)| dim.column.contains('('));
695
696 let group_by = if is_aggregate {
697 let mut gb: Vec<String> = jd.conditions.iter().map(|(_, r)| r.clone()).collect();
698 for sel in &selects {
699 if let SelectExpr::Column { column, .. } = sel {
700 if !column.contains('(') && !gb.contains(column) {
701 gb.push(column.clone());
702 }
703 }
704 }
705 gb
706 } else {
707 vec![]
708 };
709
710 JoinExpr {
711 schema: target_cube.schema.clone(),
712 table: target_table,
713 alias: format!("_j{}", join_idx),
714 conditions: jd.conditions.clone(),
715 selects,
716 group_by,
717 use_final: target_cube.use_final,
718 is_aggregate,
719 target_cube: jd.target_cube.clone(),
720 join_field: sub_field.name().to_string(),
721 }
722}
723
724fn json_to_gql_datetime(v: serde_json::Value) -> Value {
727 match v {
728 serde_json::Value::String(s) => {
729 let iso = if s.contains('T') {
730 if s.ends_with('Z') || s.contains('+') { s } else { format!("{s}Z") }
731 } else {
732 let replaced = s.replacen(' ', "T", 1);
733 if replaced.ends_with('Z') { replaced } else { format!("{replaced}Z") }
734 };
735 Value::from(iso)
736 }
737 other => json_to_gql_value(other),
738 }
739}