1use std::sync::Arc;
2use async_graphql::dynamic::*;
3use async_graphql::Value;
4
5use crate::compiler;
6use crate::compiler::ir::SqlValue;
7use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
8use crate::cube::registry::CubeRegistry;
9use crate::response::RowMap;
10use crate::schema::filter_types;
11use crate::sql::dialect::SqlDialect;
12
13pub type QueryExecutor = Arc<
16 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
17 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
18 > + Send + Sync,
19>;
20
21pub struct SchemaConfig {
23 pub networks: Vec<String>,
24 pub root_query_name: String,
25}
26
27impl Default for SchemaConfig {
28 fn default() -> Self {
29 Self {
30 networks: vec![
31 "sol", "eth", "bsc", "base", "polygon",
32 "arbitrum", "optimism", "avalanche", "sui",
33 ].into_iter().map(String::from).collect(),
34 root_query_name: "ChainStream".to_string(),
35 }
36 }
37}
38
39pub fn build_schema(
41 registry: CubeRegistry,
42 dialect: Arc<dyn SqlDialect>,
43 executor: QueryExecutor,
44 config: SchemaConfig,
45) -> Result<Schema, SchemaError> {
46 let mut builder = Schema::build("Query", None, None);
47
48 let mut network_enum = Enum::new("Network");
50 for net in &config.networks {
51 network_enum = network_enum.item(EnumItem::new(net));
52 }
53 builder = builder.register(network_enum);
54 builder = builder.register(filter_types::build_limit_input());
55
56 for input in filter_types::build_filter_primitives() {
57 builder = builder.register(input);
58 }
59
60 let mut query = Object::new("Query");
63
64 for cube in registry.cubes() {
65 let types = build_cube_types(cube);
66 for obj in types.objects { builder = builder.register(obj); }
67 for inp in types.inputs { builder = builder.register(inp); }
68 for en in types.enums { builder = builder.register(en); }
69
70 let cube_name = cube.name.clone();
71 let dialect_clone = dialect.clone();
72 let executor_clone = executor.clone();
73
74 let mut field = Field::new(
75 &cube.name,
76 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
77 move |ctx| {
78 let cube_name = cube_name.clone();
79 let dialect = dialect_clone.clone();
80 let executor = executor_clone.clone();
81 FieldFuture::new(async move {
82 let registry = ctx.ctx.data::<CubeRegistry>()?;
83 let network_val = ctx.args.try_get("network")?;
84 let network = network_val.enum_name()
85 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
86
87 let cube_def = registry.get(&cube_name).ok_or_else(|| {
88 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
89 })?;
90
91 let metric_requests = extract_metric_requests(&ctx, cube_def);
92 let ir = compiler::parser::parse_cube_query(cube_def, network, &ctx.args, &metric_requests)?;
93 let validated = compiler::validator::validate(ir)?;
94 let (sql, bindings) = dialect.compile(&validated);
95
96 let rows = executor(sql, bindings).await.map_err(|e| {
97 async_graphql::Error::new(format!("Query execution failed: {e}"))
98 })?;
99
100 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
101 Ok(Some(FieldValue::list(values)))
102 })
103 },
104 )
105 .argument(InputValue::new("network", TypeRef::named_nn("Network")))
106 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
107 .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
108 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))));
109
110 for sel in &cube.selectors {
111 let filter_type = dim_type_to_filter_name(&sel.dim_type);
112 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
113 }
114
115 query = query.field(field);
116 }
117
118 builder = builder.register(query);
119 builder = builder.data(registry);
120
121 builder.finish()
122}
123
124fn extract_metric_requests(
128 ctx: &async_graphql::dynamic::ResolverContext,
129 cube: &CubeDefinition,
130) -> Vec<compiler::parser::MetricRequest> {
131 let mut requests = Vec::new();
132
133 for sub_field in ctx.ctx.field().selection_set() {
134 let name = sub_field.name();
135 if !cube.metrics.contains(&name.to_string()) {
136 continue;
137 }
138
139 let args = match sub_field.arguments() {
140 Ok(args) => args,
141 Err(_) => continue,
142 };
143
144 let of_dimension = args
145 .iter()
146 .find(|(k, _)| k.as_str() == "of")
147 .and_then(|(_, v)| match v {
148 async_graphql::Value::Enum(e) => Some(e.to_string()),
149 async_graphql::Value::String(s) => Some(s.clone()),
150 _ => None,
151 })
152 .unwrap_or_else(|| "*".to_string());
153
154 let select_where_value = args
155 .iter()
156 .find(|(k, _)| k.as_str() == "selectWhere")
157 .map(|(_, v)| v.clone());
158
159 requests.push(compiler::parser::MetricRequest {
160 function: name.to_string(),
161 of_dimension,
162 select_where_value,
163 });
164 }
165
166 requests
167}
168
169struct CubeTypes {
174 objects: Vec<Object>,
175 inputs: Vec<InputObject>,
176 enums: Vec<Enum>,
177}
178
179fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
180 let record_name = format!("{}Record", cube.name);
181 let filter_name = format!("{}Filter", cube.name);
182 let orderby_name = format!("{}OrderBy", cube.name);
183
184 let mut record_fields: Vec<Field> = Vec::new();
185 let mut filter_fields: Vec<InputValue> = Vec::new();
186 let mut orderby_items: Vec<String> = Vec::new();
187 let mut extra_objects: Vec<Object> = Vec::new();
188 let mut extra_inputs: Vec<InputObject> = Vec::new();
189
190 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
191
192 {
193 let mut collector = DimCollector {
194 cube_name: &cube.name,
195 record_fields: &mut record_fields,
196 filter_fields: &mut filter_fields,
197 orderby_items: &mut orderby_items,
198 extra_objects: &mut extra_objects,
199 extra_inputs: &mut extra_inputs,
200 };
201 for node in &cube.dimensions {
202 collect_dimension_types(node, "", &mut collector);
203 }
204 }
205
206 let flat_dims = cube.flat_dimensions();
207 let mut metric_enums: Vec<Enum> = Vec::new();
208 for metric in &cube.metrics {
209 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
210 extra_inputs.push(
211 InputObject::new(&select_where_name)
212 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
213 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
214 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
215 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
216 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
217 );
218
219 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
220 let mut of_enum = Enum::new(&of_enum_name);
221 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
222 metric_enums.push(of_enum);
223
224 let metric_clone = metric.clone();
225 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
226 let metric_key = metric_clone.clone();
227 FieldFuture::new(async move {
228 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
229 let key = format!("__{metric_key}");
230 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
231 Ok(Some(FieldValue::value(json_to_gql_value(val))))
232 })
233 })
234 .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
235 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)));
236
237 record_fields.push(metric_field);
238 }
239
240 let mut record = Object::new(&record_name);
241 for f in record_fields { record = record.field(f); }
242
243 let mut filter = InputObject::new(&filter_name);
244 for f in filter_fields { filter = filter.field(f); }
245
246 let mut orderby = Enum::new(&orderby_name);
247 for item in orderby_items { orderby = orderby.item(EnumItem::new(item)); }
248
249 let mut objects = vec![record]; objects.extend(extra_objects);
250 let mut inputs = vec![filter]; inputs.extend(extra_inputs);
251 let mut enums = vec![orderby]; enums.extend(metric_enums);
252
253 CubeTypes { objects, inputs, enums }
254}
255
256struct DimCollector<'a> {
257 cube_name: &'a str,
258 record_fields: &'a mut Vec<Field>,
259 filter_fields: &'a mut Vec<InputValue>,
260 orderby_items: &'a mut Vec<String>,
261 extra_objects: &'a mut Vec<Object>,
262 extra_inputs: &'a mut Vec<InputObject>,
263}
264
265fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
266 match node {
267 DimensionNode::Leaf(dim) => {
268 let col = dim.column.clone();
269 let leaf_field = Field::new(
270 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
271 move |ctx| {
272 let col = col.clone();
273 FieldFuture::new(async move {
274 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
275 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
276 Ok(Some(FieldValue::value(json_to_gql_value(val))))
277 })
278 },
279 );
280 c.record_fields.push(leaf_field);
281 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
282
283 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
284 c.orderby_items.push(format!("{path}_ASC"));
285 c.orderby_items.push(format!("{path}_DESC"));
286 }
287 DimensionNode::Group { graphql_name, children } => {
288 let nested_record_name = format!("{}_{graphql_name}_Record", c.cube_name);
289 let nested_filter_name = format!("{}_{graphql_name}_Filter", c.cube_name);
290
291 let mut child_record_fields: Vec<Field> = Vec::new();
292 let mut child_filter_fields: Vec<InputValue> = Vec::new();
293 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
294
295 let mut child_collector = DimCollector {
296 cube_name: c.cube_name,
297 record_fields: &mut child_record_fields,
298 filter_fields: &mut child_filter_fields,
299 orderby_items: c.orderby_items,
300 extra_objects: c.extra_objects,
301 extra_inputs: c.extra_inputs,
302 };
303 for child in children {
304 collect_dimension_types(child, &new_prefix, &mut child_collector);
305 }
306
307 let mut nested_record = Object::new(&nested_record_name);
308 for f in child_record_fields { nested_record = nested_record.field(f); }
309
310 let mut nested_filter = InputObject::new(&nested_filter_name);
311 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
312
313 let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
314 FieldFuture::new(async move {
315 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
316 Ok(Some(FieldValue::owned_any(row.clone())))
317 })
318 });
319 c.record_fields.push(group_field);
320 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
321 c.extra_objects.push(nested_record);
322 c.extra_inputs.push(nested_filter);
323 }
324 }
325}
326
327fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
328 match dt {
329 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
330 DimType::Int => TypeRef::named(TypeRef::INT),
331 DimType::Float => TypeRef::named(TypeRef::FLOAT),
332 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
333 }
334}
335
336fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
337 match dt {
338 DimType::String => "StringFilter",
339 DimType::Int => "IntFilter",
340 DimType::Float => "FloatFilter",
341 DimType::DateTime => "DateTimeFilter",
342 DimType::Bool => "BoolFilter",
343 }
344}
345
346pub fn json_to_gql_value(v: serde_json::Value) -> Value {
347 match v {
348 serde_json::Value::Null => Value::Null,
349 serde_json::Value::Bool(b) => Value::from(b),
350 serde_json::Value::Number(n) => {
351 if let Some(i) = n.as_i64() { Value::from(i) }
352 else if let Some(f) = n.as_f64() { Value::from(f) }
353 else { Value::from(n.to_string()) }
354 }
355 serde_json::Value::String(s) => Value::from(s),
356 _ => Value::from(v.to_string()),
357 }
358}