Skip to main content

shaperail_runtime/graphql/
schema.rs

1//! Builds a dynamic GraphQL schema from resource definitions and app state (M15).
2
3use std::sync::Arc;
4
5use async_graphql::dynamic::{
6    Field, FieldFuture, InputObject, InputValue, Object, Schema, SchemaBuilder, Subscription,
7    SubscriptionField, SubscriptionFieldFuture, TypeRef,
8};
9use async_graphql::Value;
10use shaperail_core::{
11    EndpointSpec, FieldType, GraphQLConfig, HttpMethod, PaginationStyle, RelationType,
12    ResourceDefinition, ShaperailError,
13};
14
15use crate::auth::rbac;
16use crate::db::{FilterSet, PageRequest, ResourceQuery, SortParam};
17use crate::handlers::crud::{
18    extract_input_from_value, run_write_side_effects, schedule_file_cleanup, store_for_or_error,
19    AppState,
20};
21use crate::handlers::validate::validate_input;
22
23/// Context passed into GraphQL resolvers (state, resources, auth).
24#[derive(Clone)]
25pub struct GqlContext {
26    pub state: Arc<AppState>,
27    pub resources: Vec<ResourceDefinition>,
28    /// Authenticated user from JWT/API key (same as REST).
29    pub user: Option<crate::auth::extractor::AuthenticatedUser>,
30    /// DataLoader for batching/caching relation lookups (N+1 prevention).
31    pub loader: super::dataloader::RelationLoader,
32}
33
34/// Type alias for the dynamic schema (for clarity at call sites).
35pub type GraphQLSchema = Schema;
36
37/// Returns TypeRef for schema fields. Uses only nullable refs so the dynamic
38/// schema resolves base type names (e.g. "String"); named_nn causes lookup of "String!" which fails.
39fn field_type_to_type_ref(ft: &FieldType, _required: bool) -> TypeRef {
40    match ft {
41        FieldType::Uuid => TypeRef::named("String"),
42        FieldType::String | FieldType::Enum | FieldType::File => TypeRef::named("String"),
43        FieldType::Integer => TypeRef::named("Int"),
44        FieldType::Bigint => TypeRef::named("Int"),
45        FieldType::Number => TypeRef::named("Float"),
46        FieldType::Boolean => TypeRef::named("Boolean"),
47        FieldType::Timestamp | FieldType::Date => TypeRef::named("String"),
48        FieldType::Json | FieldType::Array => TypeRef::named("String"),
49    }
50}
51
52/// Converts serde_json::Value to async_graphql::Value for resolver results.
53fn json_to_gql_value(v: &serde_json::Value) -> Value {
54    Value::from_json(v.clone()).unwrap_or(Value::Null)
55}
56
57/// Pascal-case resource name for GraphQL type (e.g. "posts" -> "Post").
58fn object_type_name(resource: &str) -> String {
59    let mut s = resource.to_string();
60    if let Some(r) = s.get_mut(0..1) {
61        r.make_ascii_uppercase();
62    }
63    s
64}
65
66/// Builds the Query object with list and get fields for each resource.
67fn build_query_object(resources: &[ResourceDefinition]) -> Object {
68    let mut query = Object::new("Query");
69
70    for resource in resources {
71        let type_name = object_type_name(&resource.resource);
72        let list_type = TypeRef::named_list_nn(type_name.clone());
73        let single_type = TypeRef::named(type_name.clone());
74
75        // list_<resource>(limit, offset)
76        let res = resource.clone();
77        let list_field = Field::new(
78            format!("list_{}", resource.resource),
79            list_type,
80            move |ctx| {
81                let res = res.clone();
82                FieldFuture::new(async move {
83                    let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
84                    let endpoint = res
85                        .endpoints
86                        .as_ref()
87                        .and_then(|e| e.get("list"))
88                        .cloned()
89                        .unwrap_or_else(|| EndpointSpec {
90                            method: Some(HttpMethod::Get),
91                            path: Some(format!("/{}", res.resource)),
92                            auth: None,
93                            input: None,
94                            filters: None,
95                            search: None,
96                            pagination: Some(PaginationStyle::Offset),
97                            sort: None,
98                            cache: None,
99                            controller: None,
100                            events: None,
101                            jobs: None,
102                            upload: None,
103                            soft_delete: false,
104                        });
105                    rbac::enforce(endpoint.auth.as_ref(), gql.user.as_ref())
106                        .map_err(|e| e.to_string())?;
107                    let store_opt = store_for_or_error(&gql.state, &res)?;
108                    let limit = ctx
109                        .args
110                        .get("limit")
111                        .and_then(|v| v.i64().ok())
112                        .unwrap_or(25);
113                    let offset = ctx
114                        .args
115                        .get("offset")
116                        .and_then(|v| v.i64().ok())
117                        .unwrap_or(0);
118                    let page = PageRequest::Offset {
119                        offset,
120                        limit: PageRequest::clamped_limit(Some(limit)),
121                    };
122                    let filters = FilterSet::default();
123                    let sort = SortParam::default();
124
125                    let (rows, _meta) = if let Some(store) = store_opt {
126                        store
127                            .find_all(&endpoint, &filters, None, &sort, &page)
128                            .await
129                            .map_err(|e: ShaperailError| e.to_string())?
130                    } else {
131                        let rq = ResourceQuery::new(&res, &gql.state.pool);
132                        rq.find_all(&filters, None, &sort, &page)
133                            .await
134                            .map_err(|e: ShaperailError| e.to_string())?
135                    };
136
137                    let list: Vec<Value> =
138                        rows.into_iter().map(|r| json_to_gql_value(&r.0)).collect();
139                    Ok(Some(Value::List(list)))
140                })
141            },
142        )
143        .argument(async_graphql::dynamic::InputValue::new(
144            "limit",
145            TypeRef::named("Int"),
146        ))
147        .argument(async_graphql::dynamic::InputValue::new(
148            "offset",
149            TypeRef::named("Int"),
150        ));
151
152        query = query.field(list_field);
153
154        // <singular>(id: ID!)
155        let res = resource.clone();
156        let get_field = Field::new(singular_name(&resource.resource), single_type, move |ctx| {
157            let res = res.clone();
158            FieldFuture::new(async move {
159                let id_str = ctx
160                    .args
161                    .get("id")
162                    .and_then(|v| v.string().ok())
163                    .ok_or("id required")?;
164                let id = uuid::Uuid::parse_str(id_str).map_err(|e| e.to_string())?;
165                let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
166                let endpoint = res
167                    .endpoints
168                    .as_ref()
169                    .and_then(|e| e.get("get"))
170                    .cloned()
171                    .unwrap_or_else(|| EndpointSpec {
172                        method: Some(HttpMethod::Get),
173                        path: Some(format!("/{}/:id", res.resource)),
174                        auth: None,
175                        input: None,
176                        filters: None,
177                        search: None,
178                        pagination: None,
179                        sort: None,
180                        cache: None,
181                        controller: None,
182                        events: None,
183                        jobs: None,
184                        upload: None,
185                        soft_delete: false,
186                    });
187                rbac::enforce(endpoint.auth.as_ref(), gql.user.as_ref())
188                    .map_err(|e| e.to_string())?;
189                let store_opt = store_for_or_error(&gql.state, &res)?;
190
191                let row = if let Some(store) = store_opt {
192                    store
193                        .find_by_id(&id)
194                        .await
195                        .map_err(|e: ShaperailError| e.to_string())?
196                } else {
197                    let rq = ResourceQuery::new(&res, &gql.state.pool);
198                    rq.find_by_id(&id)
199                        .await
200                        .map_err(|e: ShaperailError| e.to_string())?
201                };
202
203                if rbac::needs_owner_check(endpoint.auth.as_ref(), gql.user.as_ref()) {
204                    if let Some(ref u) = gql.user {
205                        rbac::check_owner(u, &row.0).map_err(|e| e.to_string())?;
206                    }
207                }
208
209                Ok(Some(json_to_gql_value(&row.0)))
210            })
211        })
212        .argument(async_graphql::dynamic::InputValue::new(
213            "id",
214            TypeRef::named("String"),
215        ));
216
217        query = query.field(get_field);
218    }
219
220    query
221}
222
223fn singular_name(resource: &str) -> String {
224    if resource.ends_with('s') && resource.len() > 1 {
225        resource[..resource.len() - 1].to_string()
226    } else {
227        resource.to_string()
228    }
229}
230
231/// Input field names for create/update (from endpoint.input or schema).
232fn input_field_names(resource: &ResourceDefinition, endpoint: &EndpointSpec) -> Vec<String> {
233    if let Some(input_fields) = &endpoint.input {
234        return input_fields.clone();
235    }
236    resource
237        .schema
238        .iter()
239        .filter(|(_, fs)| !fs.generated && !fs.primary)
240        .map(|(name, _)| name.clone())
241        .collect()
242}
243
244/// Builds InputObject for create/update (one per resource that has create or update endpoint).
245fn build_input_objects(resources: &[ResourceDefinition]) -> Vec<InputObject> {
246    let mut out = Vec::new();
247    for resource in resources {
248        let has_create = resource
249            .endpoints
250            .as_ref()
251            .map(|e| e.contains_key("create"))
252            .unwrap_or(false);
253        let has_update = resource
254            .endpoints
255            .as_ref()
256            .map(|e| e.contains_key("update"))
257            .unwrap_or(false);
258        if !has_create && !has_update {
259            continue;
260        }
261        let endpoint = resource
262            .endpoints
263            .as_ref()
264            .and_then(|e| e.get("create").or_else(|| e.get("update")))
265            .cloned()
266            .unwrap_or_else(|| EndpointSpec {
267                method: Some(HttpMethod::Post),
268                path: Some(format!("/{}", resource.resource)),
269                auth: None,
270                input: None,
271                filters: None,
272                search: None,
273                pagination: None,
274                sort: None,
275                cache: None,
276                controller: None,
277                events: None,
278                jobs: None,
279                upload: None,
280                soft_delete: false,
281            });
282        let type_name = object_type_name(&resource.resource);
283        let input_name = format!("{}Input", type_name);
284        let fields = input_field_names(resource, &endpoint);
285        let mut input_obj = InputObject::new(input_name.clone());
286        for field_name in &fields {
287            if let Some(fs) = resource.schema.get(field_name) {
288                let ty = field_type_to_type_ref(&fs.field_type, false);
289                input_obj = input_obj.field(InputValue::new(field_name.clone(), ty));
290            }
291        }
292        out.push(input_obj);
293    }
294    out
295}
296
297/// Builds the Mutation object with create, update, delete for each resource.
298fn build_mutation_object(resources: &[ResourceDefinition]) -> Object {
299    let mut mutation = Object::new("Mutation");
300
301    for resource in resources {
302        let type_name = object_type_name(&resource.resource);
303        let single_type = TypeRef::named(type_name.clone());
304        let input_type_name = format!("{}Input", type_name);
305
306        if resource
307            .endpoints
308            .as_ref()
309            .map(|e| e.contains_key("create"))
310            .unwrap_or(false)
311        {
312            let res = resource.clone();
313            let create_field = Field::new(
314                format!("create_{}", resource.resource),
315                single_type.clone(),
316                move |ctx| {
317                    let res = res.clone();
318                    FieldFuture::new(async move {
319                        let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
320                        let endpoint = res
321                            .endpoints
322                            .as_ref()
323                            .and_then(|e| e.get("create"))
324                            .cloned()
325                            .ok_or("create endpoint missing")?;
326                        rbac::enforce(endpoint.auth.as_ref(), gql.user.as_ref())
327                            .map_err(|e| e.to_string())?;
328                        let input_accessor = ctx.args.try_get("input").map_err(|e| e.message)?;
329                        let json_val = input_accessor
330                            .as_value()
331                            .clone()
332                            .into_json()
333                            .map_err(|e| e.to_string())?;
334                        let input_data = extract_input_from_value(&json_val, &res, &endpoint)
335                            .map_err(|e| e.to_string())?;
336                        validate_input(&input_data, &res).map_err(|e| e.to_string())?;
337                        let store_opt = store_for_or_error(&gql.state, &res)?;
338                        let row = if let Some(store) = store_opt {
339                            store
340                                .insert(&input_data)
341                                .await
342                                .map_err(|e: ShaperailError| e.to_string())?
343                        } else {
344                            let rq = ResourceQuery::new(&res, &gql.state.pool);
345                            rq.insert(&input_data)
346                                .await
347                                .map_err(|e: ShaperailError| e.to_string())?
348                        };
349                        run_write_side_effects(&gql.state, &res, &endpoint, "created", &row.0)
350                            .await;
351                        Ok(Some(json_to_gql_value(&row.0)))
352                    })
353                },
354            )
355            .argument(InputValue::new(
356                "input",
357                TypeRef::named(input_type_name.clone()),
358            ));
359            mutation = mutation.field(create_field);
360        }
361
362        if resource
363            .endpoints
364            .as_ref()
365            .map(|e| e.contains_key("update"))
366            .unwrap_or(false)
367        {
368            let res = resource.clone();
369            let update_field = Field::new(
370                format!("update_{}", resource.resource),
371                single_type.clone(),
372                move |ctx| {
373                    let res = res.clone();
374                    FieldFuture::new(async move {
375                        let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
376                        let endpoint = res
377                            .endpoints
378                            .as_ref()
379                            .and_then(|e| e.get("update"))
380                            .cloned()
381                            .ok_or("update endpoint missing")?;
382                        rbac::enforce(endpoint.auth.as_ref(), gql.user.as_ref())
383                            .map_err(|e| e.to_string())?;
384                        let id_str = ctx
385                            .args
386                            .get("id")
387                            .and_then(|v| v.string().ok())
388                            .ok_or("id required")?;
389                        let id = uuid::Uuid::parse_str(id_str).map_err(|e| e.to_string())?;
390                        let store_opt = store_for_or_error(&gql.state, &res)?;
391                        if rbac::needs_owner_check(endpoint.auth.as_ref(), gql.user.as_ref()) {
392                            let existing = if let Some(store) = &store_opt {
393                                store
394                                    .find_by_id(&id)
395                                    .await
396                                    .map_err(|e: ShaperailError| e.to_string())?
397                            } else {
398                                let rq = ResourceQuery::new(&res, &gql.state.pool);
399                                rq.find_by_id(&id)
400                                    .await
401                                    .map_err(|e: ShaperailError| e.to_string())?
402                            };
403                            if let Some(ref u) = gql.user {
404                                rbac::check_owner(u, &existing.0).map_err(|e| e.to_string())?;
405                            }
406                        }
407                        let input_accessor = ctx.args.try_get("input").map_err(|e| e.message)?;
408                        let json_val = input_accessor
409                            .as_value()
410                            .clone()
411                            .into_json()
412                            .map_err(|e| e.to_string())?;
413                        let input_data = extract_input_from_value(&json_val, &res, &endpoint)
414                            .map_err(|e| e.to_string())?;
415                        validate_input(&input_data, &res).map_err(|e| e.to_string())?;
416                        let row = if let Some(store) = store_opt {
417                            store
418                                .update_by_id(&id, &input_data)
419                                .await
420                                .map_err(|e: ShaperailError| e.to_string())?
421                        } else {
422                            let rq = ResourceQuery::new(&res, &gql.state.pool);
423                            rq.update_by_id(&id, &input_data)
424                                .await
425                                .map_err(|e: ShaperailError| e.to_string())?
426                        };
427                        run_write_side_effects(&gql.state, &res, &endpoint, "updated", &row.0)
428                            .await;
429                        Ok(Some(json_to_gql_value(&row.0)))
430                    })
431                },
432            )
433            .argument(InputValue::new("id", TypeRef::named("String")))
434            .argument(InputValue::new(
435                "input",
436                TypeRef::named(input_type_name.clone()),
437            ));
438            mutation = mutation.field(update_field);
439        }
440
441        if resource
442            .endpoints
443            .as_ref()
444            .map(|e| e.contains_key("delete"))
445            .unwrap_or(false)
446        {
447            let res = resource.clone();
448            let endpoint = resource
449                .endpoints
450                .as_ref()
451                .and_then(|e| e.get("delete"))
452                .cloned()
453                .unwrap_or_else(|| EndpointSpec {
454                    method: Some(HttpMethod::Delete),
455                    path: Some(format!("/{}/:id", resource.resource)),
456                    auth: None,
457                    input: None,
458                    filters: None,
459                    search: None,
460                    pagination: None,
461                    sort: None,
462                    cache: None,
463                    controller: None,
464                    events: None,
465                    jobs: None,
466                    upload: None,
467                    soft_delete: true,
468                });
469            let delete_field = Field::new(
470                format!("delete_{}", resource.resource),
471                single_type,
472                move |ctx| {
473                    let res = res.clone();
474                    let endpoint = endpoint.clone();
475                    FieldFuture::new(async move {
476                        let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
477                        rbac::enforce(endpoint.auth.as_ref(), gql.user.as_ref())
478                            .map_err(|e| e.to_string())?;
479                        let id_str = ctx
480                            .args
481                            .get("id")
482                            .and_then(|v| v.string().ok())
483                            .ok_or("id required")?;
484                        let id = uuid::Uuid::parse_str(id_str).map_err(|e| e.to_string())?;
485                        let store_opt = store_for_or_error(&gql.state, &res)?;
486                        if rbac::needs_owner_check(endpoint.auth.as_ref(), gql.user.as_ref()) {
487                            let existing = if let Some(store) = &store_opt {
488                                store
489                                    .find_by_id(&id)
490                                    .await
491                                    .map_err(|e: ShaperailError| e.to_string())?
492                            } else {
493                                let rq = ResourceQuery::new(&res, &gql.state.pool);
494                                rq.find_by_id(&id)
495                                    .await
496                                    .map_err(|e: ShaperailError| e.to_string())?
497                            };
498                            if let Some(ref u) = gql.user {
499                                rbac::check_owner(u, &existing.0).map_err(|e| e.to_string())?;
500                            }
501                        }
502                        let (return_data, deleted_data) = if endpoint.soft_delete {
503                            let row = if let Some(store) = store_opt {
504                                store
505                                    .soft_delete_by_id(&id)
506                                    .await
507                                    .map_err(|e: ShaperailError| e.to_string())?
508                            } else {
509                                let rq = ResourceQuery::new(&res, &gql.state.pool);
510                                rq.soft_delete_by_id(&id)
511                                    .await
512                                    .map_err(|e: ShaperailError| e.to_string())?
513                            };
514                            let data = row.0.clone();
515                            (data.clone(), data)
516                        } else {
517                            let row = if let Some(store) = store_opt {
518                                store
519                                    .hard_delete_by_id(&id)
520                                    .await
521                                    .map_err(|e: ShaperailError| e.to_string())?
522                            } else {
523                                let rq = ResourceQuery::new(&res, &gql.state.pool);
524                                rq.hard_delete_by_id(&id)
525                                    .await
526                                    .map_err(|e: ShaperailError| e.to_string())?
527                            };
528                            let data = row.0.clone();
529                            (data.clone(), data)
530                        };
531                        if !endpoint.soft_delete {
532                            schedule_file_cleanup(&res, &deleted_data);
533                        }
534                        run_write_side_effects(
535                            &gql.state,
536                            &res,
537                            &endpoint,
538                            "deleted",
539                            &deleted_data,
540                        )
541                        .await;
542                        Ok(Some(json_to_gql_value(&return_data)))
543                    })
544                },
545            )
546            .argument(InputValue::new("id", TypeRef::named("String")));
547            mutation = mutation.field(delete_field);
548        }
549    }
550
551    mutation
552}
553
554/// Builds one Object type per resource with schema fields and relation fields.
555fn build_resource_objects(resources: &[ResourceDefinition]) -> Vec<Object> {
556    let mut objects = Vec::new();
557    let resources_ref = resources;
558
559    for resource in resources_ref {
560        let type_name = object_type_name(&resource.resource);
561        let mut obj = Object::new(type_name.clone());
562
563        for (field_name, field_schema) in &resource.schema {
564            let field_type =
565                field_type_to_type_ref(&field_schema.field_type, field_schema.required);
566            let name = field_name.clone();
567            let field = Field::new(name.clone(), field_type, move |ctx| {
568                let name = name.clone();
569                FieldFuture::new(async move {
570                    let parent = ctx.parent_value.try_to_value().map_err(|e| e.message)?;
571                    let val = match parent {
572                        Value::Object(map) => map
573                            .iter()
574                            .find(|(k, _)| k.as_str() == name.as_str())
575                            .map(|(_, v)| v.clone())
576                            .unwrap_or(Value::Null),
577                        _ => Value::Null,
578                    };
579                    Ok(Some(val))
580                })
581            });
582            obj = obj.field(field);
583        }
584
585        if let Some(relations) = &resource.relations {
586            for (relation_name, relation) in relations {
587                let related_type_name = object_type_name(&relation.resource);
588                let field_ty = match relation.relation_type {
589                    RelationType::HasMany => TypeRef::named_list(related_type_name.clone()),
590                    RelationType::BelongsTo | RelationType::HasOne => {
591                        TypeRef::named(related_type_name.clone())
592                    }
593                };
594                let res = resource.clone();
595                let rel_name = relation_name.clone();
596                let rel = relation.clone();
597                let field = Field::new(rel_name.clone(), field_ty, move |ctx| {
598                    let res = res.clone();
599                    let rel = rel.clone();
600                    let rel_name = rel_name.clone();
601                    FieldFuture::new(async move {
602                        let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
603                        let parent = ctx.parent_value.try_to_value().map_err(|e| e.message)?;
604                        let loader = &gql.loader;
605                        match rel.relation_type {
606                            RelationType::BelongsTo => {
607                                let key = rel
608                                    .key
609                                    .clone()
610                                    .unwrap_or_else(|| format!("{}_id", rel_name));
611                                let fk_str = match &parent {
612                                    Value::Object(map) => map
613                                        .iter()
614                                        .find(|(k, _)| k.as_str() == key.as_str())
615                                        .and_then(|(_, v)| match v {
616                                            Value::String(s) => Some(s.as_str()),
617                                            _ => None,
618                                        }),
619                                    _ => None,
620                                };
621                                let Some(fk_str) = fk_str else {
622                                    return Ok(Some(Value::Null));
623                                };
624                                let fk =
625                                    uuid::Uuid::parse_str(fk_str).map_err(|e| e.to_string())?;
626                                let row = loader
627                                    .load_by_id(&rel.resource, &fk)
628                                    .await
629                                    .map_err(|e: ShaperailError| e.to_string())?;
630                                match row {
631                                    Some(r) => Ok(Some(json_to_gql_value(&r.0))),
632                                    None => Ok(Some(Value::Null)),
633                                }
634                            }
635                            RelationType::HasMany | RelationType::HasOne => {
636                                let pk = res
637                                    .schema
638                                    .iter()
639                                    .find(|(_, fs)| fs.primary)
640                                    .map(|(n, _)| n.as_str())
641                                    .unwrap_or("id");
642                                let parent_id = match &parent {
643                                    Value::Object(map) => map
644                                        .iter()
645                                        .find(|(k, _)| k.as_str() == pk)
646                                        .and_then(|(_, v)| match v {
647                                            Value::String(s) => Some(s.as_str()),
648                                            _ => None,
649                                        }),
650                                    _ => None,
651                                };
652                                let Some(id_str) = parent_id else {
653                                    return Ok(Some(Value::Null));
654                                };
655                                let fk = rel.foreign_key.as_deref().unwrap_or("id");
656                                let rows = loader
657                                    .load_by_filter(&rel.resource, fk, id_str)
658                                    .await
659                                    .map_err(|e: ShaperailError| e.to_string())?;
660                                let list: Vec<Value> =
661                                    rows.into_iter().map(|r| json_to_gql_value(&r.0)).collect();
662                                if rel.relation_type == RelationType::HasOne {
663                                    Ok(Some(list.into_iter().next().unwrap_or(Value::Null)))
664                                } else {
665                                    Ok(Some(Value::List(list)))
666                                }
667                            }
668                        }
669                    })
670                });
671                obj = obj.field(field);
672            }
673        }
674
675        objects.push(obj);
676    }
677
678    objects
679}
680
681/// Builds the Subscription object from declared WebSocket events on resources.
682///
683/// For each resource, if endpoints declare `events`, a subscription field is created
684/// for each event (e.g., `user_created`, `order_updated`). The subscription streams
685/// events via a broadcast channel.
686fn build_subscription_object(resources: &[ResourceDefinition]) -> Subscription {
687    let mut subscription = Subscription::new("Subscription");
688
689    for resource in resources {
690        let type_name = object_type_name(&resource.resource);
691        let endpoints = match &resource.endpoints {
692            Some(eps) => eps,
693            None => continue,
694        };
695
696        for (_ep_name, endpoint) in endpoints {
697            let events = match &endpoint.events {
698                Some(events) => events,
699                None => continue,
700            };
701
702            for event_name in events {
703                let event = event_name.clone();
704                let sub_name = event.replace('.', "_");
705                let field = SubscriptionField::new(
706                    sub_name,
707                    TypeRef::named(type_name.clone()),
708                    move |ctx| {
709                        let event = event.clone();
710                        SubscriptionFieldFuture::new(async move {
711                            let gql = ctx.data::<GqlContext>().map_err(|e| e.message)?;
712                            // Create a broadcast receiver from the event emitter.
713                            let rx = gql.state.event_bus_subscribe(&event);
714                            let stream = futures_util::stream::unfold(rx, |mut rx| async move {
715                                match rx.recv().await {
716                                    Ok(payload) => {
717                                        let val = json_to_gql_value(&payload);
718                                        Some((Ok(val), rx))
719                                    }
720                                    Err(_) => None,
721                                }
722                            });
723                            Ok(stream)
724                        })
725                    },
726                );
727                subscription = subscription.field(field);
728            }
729        }
730    }
731
732    subscription
733}
734
735/// Builds the full GraphQL schema from resources and app state.
736///
737/// If `gql_config` is provided, depth and complexity limits are taken from it.
738/// Otherwise, defaults are used (depth: 16, complexity: 256).
739pub fn build_schema(
740    resources: &[ResourceDefinition],
741    _state: Arc<AppState>,
742) -> Result<GraphQLSchema, ShaperailError> {
743    build_schema_with_config(resources, _state, None)
744}
745
746/// Builds the full GraphQL schema with configurable depth and complexity limits.
747pub fn build_schema_with_config(
748    resources: &[ResourceDefinition],
749    _state: Arc<AppState>,
750    gql_config: Option<&GraphQLConfig>,
751) -> Result<GraphQLSchema, ShaperailError> {
752    let depth_limit = gql_config.map(|c| c.depth_limit).unwrap_or(16);
753    let complexity_limit = gql_config.map(|c| c.complexity_limit).unwrap_or(256);
754
755    let query = build_query_object(resources);
756    let mutation = build_mutation_object(resources);
757    let subscription = build_subscription_object(resources);
758    let resource_objects = build_resource_objects(resources);
759    let input_objects = build_input_objects(resources);
760
761    let mut builder: SchemaBuilder = Schema::build("Query", Some("Mutation"), Some("Subscription"))
762        .register(query)
763        .register(mutation)
764        .register(subscription)
765        .limit_depth(depth_limit)
766        .limit_complexity(complexity_limit);
767
768    for obj in input_objects {
769        builder = builder.register(obj);
770    }
771    for obj in resource_objects {
772        builder = builder.register(obj);
773    }
774
775    builder
776        .finish()
777        .map_err(|e| ShaperailError::Internal(format!("GraphQL schema build failed: {e}")))
778}