1use std::sync::Arc;
2use async_graphql::dynamic::*;
3use async_graphql::Value;
4
5use crate::compiler;
6use crate::compiler::ir::SqlValue;
7use crate::cube::definition::{CubeDefinition, DimType, DimensionNode};
8use crate::cube::registry::CubeRegistry;
9use crate::response::RowMap;
10use crate::schema::filter_types;
11use crate::sql::dialect::SqlDialect;
12use crate::stats::{QueryStats, StatsCallback};
13
14pub type QueryExecutor = Arc<
17 dyn Fn(String, Vec<SqlValue>) -> std::pin::Pin<
18 Box<dyn std::future::Future<Output = Result<Vec<RowMap>, String>> + Send>,
19 > + Send + Sync,
20>;
21
22pub struct SchemaConfig {
24 pub networks: Vec<String>,
25 pub root_query_name: String,
26 pub stats_callback: Option<StatsCallback>,
29}
30
31impl Default for SchemaConfig {
32 fn default() -> Self {
33 Self {
34 networks: vec![
35 "sol", "eth", "bsc", "base", "polygon",
36 "arbitrum", "optimism", "avalanche", "sui",
37 ].into_iter().map(String::from).collect(),
38 root_query_name: "ChainStream".to_string(),
39 stats_callback: None,
40 }
41 }
42}
43
44pub fn build_schema(
46 registry: CubeRegistry,
47 dialect: Arc<dyn SqlDialect>,
48 executor: QueryExecutor,
49 config: SchemaConfig,
50) -> Result<Schema, SchemaError> {
51 let mut builder = Schema::build("Query", None, None);
52
53 let mut network_enum = Enum::new("Network");
55 for net in &config.networks {
56 network_enum = network_enum.item(EnumItem::new(net));
57 }
58 builder = builder.register(network_enum);
59 builder = builder.register(filter_types::build_limit_input());
60
61 for input in filter_types::build_filter_primitives() {
62 builder = builder.register(input);
63 }
64
65 let mut query = Object::new("Query");
68
69 for cube in registry.cubes() {
70 let types = build_cube_types(cube);
71 for obj in types.objects { builder = builder.register(obj); }
72 for inp in types.inputs { builder = builder.register(inp); }
73 for en in types.enums { builder = builder.register(en); }
74
75 let cube_name = cube.name.clone();
76 let dialect_clone = dialect.clone();
77 let executor_clone = executor.clone();
78 let stats_cb = config.stats_callback.clone();
79
80 let mut field = Field::new(
81 &cube.name,
82 TypeRef::named_nn_list_nn(format!("{}Record", cube.name)),
83 move |ctx| {
84 let cube_name = cube_name.clone();
85 let dialect = dialect_clone.clone();
86 let executor = executor_clone.clone();
87 let stats_cb = stats_cb.clone();
88 FieldFuture::new(async move {
89 let registry = ctx.ctx.data::<CubeRegistry>()?;
90 let network_val = ctx.args.try_get("network")?;
91 let network = network_val.enum_name()
92 .map_err(|_| async_graphql::Error::new("network must be a Network enum value"))?;
93
94 let cube_def = registry.get(&cube_name).ok_or_else(|| {
95 async_graphql::Error::new(format!("Unknown cube: {cube_name}"))
96 })?;
97
98 let metric_requests = extract_metric_requests(&ctx, cube_def);
99 let ir = compiler::parser::parse_cube_query(cube_def, network, &ctx.args, &metric_requests)?;
100 let validated = compiler::validator::validate(ir)?;
101 let (sql, bindings) = dialect.compile(&validated);
102
103 let rows = executor(sql.clone(), bindings).await.map_err(|e| {
104 async_graphql::Error::new(format!("Query execution failed: {e}"))
105 })?;
106
107 let effective_cb = ctx.ctx.data::<StatsCallback>().ok().cloned()
110 .or_else(|| stats_cb.clone());
111 if let Some(cb) = effective_cb {
112 let stats = QueryStats::from_ir(&validated, rows.len(), &sql);
113 cb(stats);
114 }
115
116 let values: Vec<FieldValue> = rows.into_iter().map(FieldValue::owned_any).collect();
117 Ok(Some(FieldValue::list(values)))
118 })
119 },
120 )
121 .argument(InputValue::new("network", TypeRef::named_nn("Network")))
122 .argument(InputValue::new("where", TypeRef::named(format!("{}Filter", cube.name))))
123 .argument(InputValue::new("limit", TypeRef::named("LimitInput")))
124 .argument(InputValue::new("orderBy", TypeRef::named(format!("{}OrderBy", cube.name))));
125
126 for sel in &cube.selectors {
127 let filter_type = dim_type_to_filter_name(&sel.dim_type);
128 field = field.argument(InputValue::new(&sel.graphql_name, TypeRef::named(filter_type)));
129 }
130
131 query = query.field(field);
132 }
133
134 builder = builder.register(query);
135 builder = builder.data(registry);
136
137 builder.finish()
138}
139
140fn extract_metric_requests(
144 ctx: &async_graphql::dynamic::ResolverContext,
145 cube: &CubeDefinition,
146) -> Vec<compiler::parser::MetricRequest> {
147 let mut requests = Vec::new();
148
149 for sub_field in ctx.ctx.field().selection_set() {
150 let name = sub_field.name();
151 if !cube.metrics.contains(&name.to_string()) {
152 continue;
153 }
154
155 let args = match sub_field.arguments() {
156 Ok(args) => args,
157 Err(_) => continue,
158 };
159
160 let of_dimension = args
161 .iter()
162 .find(|(k, _)| k.as_str() == "of")
163 .and_then(|(_, v)| match v {
164 async_graphql::Value::Enum(e) => Some(e.to_string()),
165 async_graphql::Value::String(s) => Some(s.clone()),
166 _ => None,
167 })
168 .unwrap_or_else(|| "*".to_string());
169
170 let select_where_value = args
171 .iter()
172 .find(|(k, _)| k.as_str() == "selectWhere")
173 .map(|(_, v)| v.clone());
174
175 requests.push(compiler::parser::MetricRequest {
176 function: name.to_string(),
177 of_dimension,
178 select_where_value,
179 });
180 }
181
182 requests
183}
184
185struct CubeTypes {
190 objects: Vec<Object>,
191 inputs: Vec<InputObject>,
192 enums: Vec<Enum>,
193}
194
195fn build_cube_types(cube: &CubeDefinition) -> CubeTypes {
196 let record_name = format!("{}Record", cube.name);
197 let filter_name = format!("{}Filter", cube.name);
198 let orderby_name = format!("{}OrderBy", cube.name);
199
200 let mut record_fields: Vec<Field> = Vec::new();
201 let mut filter_fields: Vec<InputValue> = Vec::new();
202 let mut orderby_items: Vec<String> = Vec::new();
203 let mut extra_objects: Vec<Object> = Vec::new();
204 let mut extra_inputs: Vec<InputObject> = Vec::new();
205
206 filter_fields.push(InputValue::new("any", TypeRef::named_list(&filter_name)));
207
208 {
209 let mut collector = DimCollector {
210 cube_name: &cube.name,
211 record_fields: &mut record_fields,
212 filter_fields: &mut filter_fields,
213 orderby_items: &mut orderby_items,
214 extra_objects: &mut extra_objects,
215 extra_inputs: &mut extra_inputs,
216 };
217 for node in &cube.dimensions {
218 collect_dimension_types(node, "", &mut collector);
219 }
220 }
221
222 let flat_dims = cube.flat_dimensions();
223 let mut metric_enums: Vec<Enum> = Vec::new();
224 for metric in &cube.metrics {
225 let select_where_name = format!("{}_{}_SelectWhere", cube.name, metric);
226 extra_inputs.push(
227 InputObject::new(&select_where_name)
228 .field(InputValue::new("gt", TypeRef::named(TypeRef::STRING)))
229 .field(InputValue::new("ge", TypeRef::named(TypeRef::STRING)))
230 .field(InputValue::new("lt", TypeRef::named(TypeRef::STRING)))
231 .field(InputValue::new("le", TypeRef::named(TypeRef::STRING)))
232 .field(InputValue::new("eq", TypeRef::named(TypeRef::STRING))),
233 );
234
235 let of_enum_name = format!("{}_{}_Of", cube.name, metric);
236 let mut of_enum = Enum::new(&of_enum_name);
237 for (path, _) in &flat_dims { of_enum = of_enum.item(EnumItem::new(path)); }
238 metric_enums.push(of_enum);
239
240 let metric_clone = metric.clone();
241 let metric_field = Field::new(metric, TypeRef::named(TypeRef::FLOAT), move |ctx| {
242 let metric_key = metric_clone.clone();
243 FieldFuture::new(async move {
244 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
245 let key = format!("__{metric_key}");
246 let val = row.get(&key).cloned().unwrap_or(serde_json::Value::Null);
247 Ok(Some(FieldValue::value(json_to_gql_value(val))))
248 })
249 })
250 .argument(InputValue::new("of", TypeRef::named(&of_enum_name)))
251 .argument(InputValue::new("selectWhere", TypeRef::named(&select_where_name)));
252
253 record_fields.push(metric_field);
254 }
255
256 let mut record = Object::new(&record_name);
257 for f in record_fields { record = record.field(f); }
258
259 let mut filter = InputObject::new(&filter_name);
260 for f in filter_fields { filter = filter.field(f); }
261
262 let mut orderby = Enum::new(&orderby_name);
263 for item in orderby_items { orderby = orderby.item(EnumItem::new(item)); }
264
265 let mut objects = vec![record]; objects.extend(extra_objects);
266 let mut inputs = vec![filter]; inputs.extend(extra_inputs);
267 let mut enums = vec![orderby]; enums.extend(metric_enums);
268
269 CubeTypes { objects, inputs, enums }
270}
271
272struct DimCollector<'a> {
273 cube_name: &'a str,
274 record_fields: &'a mut Vec<Field>,
275 filter_fields: &'a mut Vec<InputValue>,
276 orderby_items: &'a mut Vec<String>,
277 extra_objects: &'a mut Vec<Object>,
278 extra_inputs: &'a mut Vec<InputObject>,
279}
280
281fn collect_dimension_types(node: &DimensionNode, prefix: &str, c: &mut DimCollector<'_>) {
282 match node {
283 DimensionNode::Leaf(dim) => {
284 let col = dim.column.clone();
285 let leaf_field = Field::new(
286 &dim.graphql_name, dim_type_to_typeref(&dim.dim_type),
287 move |ctx| {
288 let col = col.clone();
289 FieldFuture::new(async move {
290 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
291 let val = row.get(&col).cloned().unwrap_or(serde_json::Value::Null);
292 Ok(Some(FieldValue::value(json_to_gql_value(val))))
293 })
294 },
295 );
296 c.record_fields.push(leaf_field);
297 c.filter_fields.push(InputValue::new(&dim.graphql_name, TypeRef::named(dim_type_to_filter_name(&dim.dim_type))));
298
299 let path = if prefix.is_empty() { dim.graphql_name.clone() } else { format!("{}_{}", prefix, dim.graphql_name) };
300 c.orderby_items.push(format!("{path}_ASC"));
301 c.orderby_items.push(format!("{path}_DESC"));
302 }
303 DimensionNode::Group { graphql_name, children } => {
304 let nested_record_name = format!("{}_{graphql_name}_Record", c.cube_name);
305 let nested_filter_name = format!("{}_{graphql_name}_Filter", c.cube_name);
306
307 let mut child_record_fields: Vec<Field> = Vec::new();
308 let mut child_filter_fields: Vec<InputValue> = Vec::new();
309 let new_prefix = if prefix.is_empty() { graphql_name.clone() } else { format!("{prefix}_{graphql_name}") };
310
311 let mut child_collector = DimCollector {
312 cube_name: c.cube_name,
313 record_fields: &mut child_record_fields,
314 filter_fields: &mut child_filter_fields,
315 orderby_items: c.orderby_items,
316 extra_objects: c.extra_objects,
317 extra_inputs: c.extra_inputs,
318 };
319 for child in children {
320 collect_dimension_types(child, &new_prefix, &mut child_collector);
321 }
322
323 let mut nested_record = Object::new(&nested_record_name);
324 for f in child_record_fields { nested_record = nested_record.field(f); }
325
326 let mut nested_filter = InputObject::new(&nested_filter_name);
327 for f in child_filter_fields { nested_filter = nested_filter.field(f); }
328
329 let group_field = Field::new(graphql_name, TypeRef::named_nn(&nested_record_name), |ctx| {
330 FieldFuture::new(async move {
331 let row = ctx.parent_value.try_downcast_ref::<RowMap>()?;
332 Ok(Some(FieldValue::owned_any(row.clone())))
333 })
334 });
335 c.record_fields.push(group_field);
336 c.filter_fields.push(InputValue::new(graphql_name, TypeRef::named(&nested_filter_name)));
337 c.extra_objects.push(nested_record);
338 c.extra_inputs.push(nested_filter);
339 }
340 }
341}
342
343fn dim_type_to_typeref(dt: &DimType) -> TypeRef {
344 match dt {
345 DimType::String | DimType::DateTime => TypeRef::named(TypeRef::STRING),
346 DimType::Int => TypeRef::named(TypeRef::INT),
347 DimType::Float => TypeRef::named(TypeRef::FLOAT),
348 DimType::Bool => TypeRef::named(TypeRef::BOOLEAN),
349 }
350}
351
352fn dim_type_to_filter_name(dt: &DimType) -> &'static str {
353 match dt {
354 DimType::String => "StringFilter",
355 DimType::Int => "IntFilter",
356 DimType::Float => "FloatFilter",
357 DimType::DateTime => "DateTimeFilter",
358 DimType::Bool => "BoolFilter",
359 }
360}
361
362pub fn json_to_gql_value(v: serde_json::Value) -> Value {
363 match v {
364 serde_json::Value::Null => Value::Null,
365 serde_json::Value::Bool(b) => Value::from(b),
366 serde_json::Value::Number(n) => {
367 if let Some(i) = n.as_i64() { Value::from(i) }
368 else if let Some(f) = n.as_f64() { Value::from(f) }
369 else { Value::from(n.to_string()) }
370 }
371 serde_json::Value::String(s) => Value::from(s),
372 _ => Value::from(v.to_string()),
373 }
374}