Skip to main content

apollo_router/
router_factory.rs

1use std::collections::HashMap;
2use std::io;
3use std::sync::Arc;
4
5use apollo_compiler::validation::Valid;
6use axum::response::IntoResponse;
7use http::StatusCode;
8use indexmap::IndexMap;
9use multimap::MultiMap;
10use rustls::RootCertStore;
11use serde_json::Map;
12use serde_json::Value;
13use tower::BoxError;
14use tower::ServiceBuilder;
15use tower::ServiceExt;
16use tower::service_fn;
17use tower_service::Service;
18use tracing::Instrument;
19
20use crate::ListenAddr;
21use crate::configuration::APOLLO_PLUGIN_PREFIX;
22use crate::configuration::Configuration;
23use crate::configuration::ConfigurationError;
24use crate::configuration::TlsClient;
25use crate::plugin::DynPlugin;
26use crate::plugin::Handler;
27use crate::plugin::PluginFactory;
28use crate::plugin::PluginInit;
29use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
30use crate::plugins::subscription::Subscription;
31use crate::plugins::telemetry::reload::apollo_opentelemetry_initialized;
32use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING;
33use crate::plugins::traffic_shaping::TrafficShaping;
34use crate::query_planner::QueryPlannerService;
35use crate::services::HasSchema;
36use crate::services::PluggableSupergraphServiceBuilder;
37use crate::services::Plugins;
38use crate::services::SubgraphService;
39use crate::services::SupergraphCreator;
40use crate::services::apollo_graph_reference;
41use crate::services::apollo_key;
42use crate::services::http::HttpClientServiceFactory;
43use crate::services::layers::persisted_queries::PersistedQueryLayer;
44use crate::services::layers::query_analysis::QueryAnalysisLayer;
45use crate::services::new_service::ServiceFactory;
46use crate::services::router;
47use crate::services::router::pipeline_handle::PipelineRef;
48use crate::services::router::service::RouterCreator;
49use crate::services::subgraph;
50use crate::services::transport;
51use crate::spec::Schema;
52
53pub(crate) const STARTING_SPAN_NAME: &str = "starting";
54
55#[derive(Clone)]
56/// A path and a handler to be exposed as a web_endpoint for plugins
57pub struct Endpoint {
58    pub(crate) path: String,
59    // Plugins need to be Send + Sync
60    // BoxCloneService isn't enough
61    handler: Handler,
62}
63
64impl std::fmt::Debug for Endpoint {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("Endpoint")
67            .field("path", &self.path)
68            .finish()
69    }
70}
71
72impl Endpoint {
73    /// Creates an Endpoint given a path and a Boxed Service
74    #[deprecated = "use `from_router_service` instead"]
75    #[allow(deprecated)]
76    pub fn new(path: String, handler: transport::BoxService) -> Self {
77        let router_service = ServiceBuilder::new()
78            .map_request(|request: router::Request| request.router_request)
79            .map_response(|response: transport::Response| response.into())
80            .service(handler)
81            .boxed();
82        Self {
83            path,
84            handler: Handler::new(router_service),
85        }
86    }
87
88    /// Creates an Endpoint given a path and a Boxed Service
89    pub fn from_router_service(path: String, handler: router::BoxService) -> Self {
90        Self {
91            path,
92            handler: Handler::new(handler),
93        }
94    }
95    pub(crate) fn into_router(self) -> axum::Router {
96        let handler = move |req: http::Request<crate::services::router::Body>| {
97            let endpoint = self.handler.clone();
98            async move {
99                Ok(endpoint
100                    .oneshot(req.into())
101                    .await
102                    .map(|res| res.response)
103                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
104                    .into_response())
105            }
106        };
107        axum::Router::new().route_service(self.path.as_str(), service_fn(handler))
108    }
109}
110/// Factory for creating a RouterService
111///
112/// Instances of this traits are used by the HTTP server to generate a new
113/// RouterService on each request
114pub(crate) trait RouterFactory:
115    ServiceFactory<router::Request, Service = Self::RouterService> + Clone + Send + Sync + 'static
116{
117    type RouterService: Service<
118            router::Request,
119            Response = router::Response,
120            Error = BoxError,
121            Future = Self::Future,
122        > + Send;
123    type Future: Send;
124
125    fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint>;
126
127    fn pipeline_ref(&self) -> Arc<PipelineRef>;
128}
129
130/// Factory for creating a RouterFactory
131///
132/// Instances of this traits are used by the StateMachine to generate a new
133/// RouterFactory from configuration when it changes
134#[async_trait::async_trait]
135pub(crate) trait RouterSuperServiceFactory: Send + Sync + 'static {
136    type RouterFactory: RouterFactory;
137
138    async fn create<'a>(
139        &'a mut self,
140        is_telemetry_disabled: bool,
141        configuration: Arc<Configuration>,
142        schema: Arc<Schema>,
143        previous_router: Option<&'a Self::RouterFactory>,
144        extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
145    ) -> Result<Self::RouterFactory, BoxError>;
146}
147
148/// Main implementation of the SupergraphService factory, supporting the extensions system
149#[derive(Default)]
150pub(crate) struct YamlRouterFactory;
151
152#[async_trait::async_trait]
153impl RouterSuperServiceFactory for YamlRouterFactory {
154    type RouterFactory = RouterCreator;
155
156    async fn create<'a>(
157        &'a mut self,
158        _is_telemetry_disabled: bool,
159        configuration: Arc<Configuration>,
160        schema: Arc<Schema>,
161        previous_router: Option<&'a Self::RouterFactory>,
162        extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
163    ) -> Result<Self::RouterFactory, BoxError> {
164        // we have to create a telemetry plugin before creating everything else, to generate a trace
165        // of router and plugin creation
166        let plugin_registry = &*crate::plugin::PLUGINS;
167        let mut initial_telemetry_plugin = None;
168
169        if previous_router.is_none() && apollo_opentelemetry_initialized() {
170            if let Some(factory) = plugin_registry
171                .iter()
172                .find(|factory| factory.name == "apollo.telemetry")
173            {
174                let mut telemetry_config = configuration
175                    .apollo_plugins
176                    .plugins
177                    .get("telemetry")
178                    .cloned();
179                if let Some(plugin_config) = &mut telemetry_config {
180                    inject_schema_id(schema.schema_id.as_str(), plugin_config);
181                    match factory
182                        .create_instance(
183                            PluginInit::builder()
184                                .config(plugin_config.clone())
185                                .supergraph_sdl(schema.raw_sdl.clone())
186                                .supergraph_schema_id(schema.schema_id.clone().into_inner())
187                                .supergraph_schema(Arc::new(schema.supergraph_schema().clone()))
188                                .notify(configuration.notify.clone())
189                                .build(),
190                        )
191                        .await
192                    {
193                        Ok(plugin) => {
194                            if let Some(telemetry) = plugin
195                                .as_any()
196                                .downcast_ref::<crate::plugins::telemetry::Telemetry>(
197                            ) {
198                                telemetry.activate();
199                            }
200                            initial_telemetry_plugin = Some(plugin);
201                        }
202                        Err(e) => return Err(e),
203                    }
204                }
205            }
206        }
207
208        let router_span = tracing::info_span!(STARTING_SPAN_NAME);
209        Self.inner_create(
210            configuration,
211            schema,
212            previous_router,
213            initial_telemetry_plugin,
214            extra_plugins,
215        )
216        .instrument(router_span)
217        .await
218    }
219}
220
221impl YamlRouterFactory {
222    async fn inner_create<'a>(
223        &'a mut self,
224        configuration: Arc<Configuration>,
225        schema: Arc<Schema>,
226        previous_router: Option<&'a RouterCreator>,
227        initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
228        extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
229    ) -> Result<RouterCreator, BoxError> {
230        let mut supergraph_creator = self
231            .inner_create_supergraph(
232                configuration.clone(),
233                schema,
234                initial_telemetry_plugin,
235                extra_plugins,
236            )
237            .await?;
238
239        // Instantiate the parser here so we can use it to warm up the planner below
240        let query_analysis_layer =
241            QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&configuration)).await;
242
243        let persisted_query_layer = Arc::new(PersistedQueryLayer::new(&configuration).await?);
244
245        if let Some(previous_router) = previous_router {
246            let previous_cache = previous_router.previous_cache();
247
248            supergraph_creator
249                .warm_up_query_planner(
250                    &query_analysis_layer,
251                    &persisted_query_layer,
252                    Some(previous_cache),
253                    configuration.supergraph.query_planning.warmed_up_queries,
254                    configuration
255                        .supergraph
256                        .query_planning
257                        .experimental_reuse_query_plans,
258                    &configuration
259                        .persisted_queries
260                        .experimental_prewarm_query_plan_cache,
261                )
262                .await;
263        } else {
264            supergraph_creator
265                .warm_up_query_planner(
266                    &query_analysis_layer,
267                    &persisted_query_layer,
268                    None,
269                    configuration.supergraph.query_planning.warmed_up_queries,
270                    configuration
271                        .supergraph
272                        .query_planning
273                        .experimental_reuse_query_plans,
274                    &configuration
275                        .persisted_queries
276                        .experimental_prewarm_query_plan_cache,
277                )
278                .await;
279        };
280        RouterCreator::new(
281            query_analysis_layer,
282            persisted_query_layer,
283            Arc::new(supergraph_creator),
284            configuration,
285        )
286        .await
287    }
288
289    pub(crate) async fn inner_create_supergraph(
290        &mut self,
291        configuration: Arc<Configuration>,
292        schema: Arc<Schema>,
293        initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
294        extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
295    ) -> Result<SupergraphCreator, BoxError> {
296        let query_planner_span = tracing::info_span!("query_planner_creation");
297        // QueryPlannerService takes an UnplannedRequest and outputs PlannedRequest
298        let planner = QueryPlannerService::new(schema.clone(), configuration.clone())
299            .instrument(query_planner_span)
300            .await?;
301
302        let span = tracing::info_span!("plugins");
303
304        // Process the plugins.
305        let subgraph_schemas = Arc::new(
306            planner
307                .subgraph_schemas()
308                .iter()
309                .map(|(k, v)| (k.clone(), v.schema.clone()))
310                .collect(),
311        );
312
313        let plugins: Arc<Plugins> = Arc::new(
314            create_plugins(
315                &configuration,
316                &schema,
317                subgraph_schemas,
318                initial_telemetry_plugin,
319                extra_plugins,
320            )
321            .instrument(span)
322            .await?
323            .into_iter()
324            .collect(),
325        );
326
327        async {
328            let mut builder = PluggableSupergraphServiceBuilder::new(planner);
329            builder = builder.with_configuration(configuration.clone());
330            let subgraph_services =
331                create_subgraph_services(&plugins, &schema, &configuration).await?;
332            for (name, subgraph_service) in subgraph_services {
333                builder = builder.with_subgraph_service(&name, subgraph_service);
334            }
335
336            // Final creation after this line we must NOT fail to go live with the new router from this point as some plugins may interact with globals.
337            let supergraph_creator = builder.with_plugins(plugins).build().await?;
338
339            Ok(supergraph_creator)
340        }
341        .instrument(tracing::info_span!("supergraph_creation"))
342        .await
343    }
344}
345
346pub(crate) async fn create_subgraph_services(
347    plugins: &Arc<Plugins>,
348    schema: &Schema,
349    configuration: &Configuration,
350) -> Result<
351    IndexMap<
352        String,
353        impl Service<
354            subgraph::Request,
355            Response = subgraph::Response,
356            Error = BoxError,
357            Future = crate::plugins::traffic_shaping::TrafficShapingSubgraphFuture<SubgraphService>,
358        > + Clone
359        + Send
360        + Sync
361        + 'static,
362    >,
363    BoxError,
364> {
365    let tls_root_store: RootCertStore = configuration
366        .tls
367        .subgraph
368        .all
369        .create_certificate_store()
370        .transpose()?
371        .unwrap_or_else(crate::services::http::HttpClientService::native_roots_store);
372
373    let subscription_plugin_conf = plugins
374        .iter()
375        .find(|i| i.0.as_str() == APOLLO_SUBSCRIPTION_PLUGIN)
376        .and_then(|plugin| (*plugin.1).as_any().downcast_ref::<Subscription>())
377        .map(|p| p.config.clone());
378
379    let shaping = plugins
380        .iter()
381        .find(|i| i.0.as_str() == APOLLO_TRAFFIC_SHAPING)
382        .and_then(|plugin| (*plugin.1).as_any().downcast_ref::<TrafficShaping>())
383        .expect("traffic shaping should always be part of the plugin list");
384
385    let mut subgraph_services = IndexMap::default();
386    for (name, _) in schema.subgraphs() {
387        let http_service = crate::services::http::HttpClientService::from_config(
388            name,
389            configuration,
390            &tls_root_store,
391            shaping.subgraph_client_config(name),
392        )?;
393
394        let http_service_factory = HttpClientServiceFactory::new(http_service, plugins.clone());
395
396        let subgraph_service = shaping.subgraph_service_internal(
397            name,
398            SubgraphService::from_config(
399                name,
400                configuration,
401                subscription_plugin_conf.clone(),
402                http_service_factory,
403            )?,
404        );
405        subgraph_services.insert(name.clone(), subgraph_service);
406    }
407
408    Ok(subgraph_services)
409}
410
411impl TlsClient {
412    pub(crate) fn create_certificate_store(
413        &self,
414    ) -> Option<Result<RootCertStore, ConfigurationError>> {
415        self.certificate_authorities
416            .as_deref()
417            .map(create_certificate_store)
418    }
419}
420
421pub(crate) fn create_certificate_store(
422    certificate_authorities: &str,
423) -> Result<RootCertStore, ConfigurationError> {
424    let mut store = RootCertStore::empty();
425    let certificates = load_certs(certificate_authorities).map_err(|e| {
426        ConfigurationError::CertificateAuthorities {
427            error: format!("could not parse the certificate list: {e}"),
428        }
429    })?;
430    for certificate in certificates {
431        store
432            .add(&certificate)
433            .map_err(|e| ConfigurationError::CertificateAuthorities {
434                error: format!("could not add certificate to root store: {e}"),
435            })?;
436    }
437    if store.is_empty() {
438        Err(ConfigurationError::CertificateAuthorities {
439            error: "the certificate list is empty".to_string(),
440        })
441    } else {
442        Ok(store)
443    }
444}
445
446fn load_certs(certificates: &str) -> io::Result<Vec<rustls::Certificate>> {
447    tracing::debug!("loading root certificates");
448
449    // Load and return certificate.
450    let certs = rustls_pemfile::certs(&mut certificates.as_bytes()).map_err(|_| {
451        io::Error::new(
452            io::ErrorKind::Other,
453            "failed to load certificate".to_string(),
454        )
455    })?;
456    Ok(certs.into_iter().map(rustls::Certificate).collect())
457}
458
459/// test only helper method to create a router factory in integration tests
460///
461/// not meant to be used directly
462pub async fn create_test_service_factory_from_yaml(schema: &str, configuration: &str) {
463    let config: Configuration = serde_yaml::from_str(configuration).unwrap();
464    let schema = Arc::new(Schema::parse(schema, &config).unwrap());
465
466    let is_telemetry_disabled = false;
467    let service = YamlRouterFactory
468        .create(is_telemetry_disabled, Arc::new(config), schema, None, None)
469        .await;
470    assert_eq!(
471        service.map(|_| ()).unwrap_err().to_string().as_str(),
472        r#"failed to initialize the query planner: An internal error has occurred, please report this bug to Apollo.
473
474Details: Object field "Product.reviews"'s inner type "Review" does not refer to an existing output type."#
475    );
476}
477
478#[allow(clippy::too_many_arguments)]
479pub(crate) async fn add_plugin(
480    name: String,
481    factory: &PluginFactory,
482    plugin_config: &Value,
483    schema: Arc<String>,
484    schema_id: Arc<String>,
485    supergraph_schema: Arc<Valid<apollo_compiler::Schema>>,
486    subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
487    launch_id: Option<Arc<String>>,
488    notify: &crate::notification::Notify<String, crate::graphql::Response>,
489    plugin_instances: &mut Plugins,
490    errors: &mut Vec<ConfigurationError>,
491) {
492    match factory
493        .create_instance(
494            PluginInit::builder()
495                .config(plugin_config.clone())
496                .supergraph_sdl(schema)
497                .supergraph_schema_id(schema_id)
498                .supergraph_schema(supergraph_schema)
499                .subgraph_schemas(subgraph_schemas)
500                .launch_id(launch_id)
501                .notify(notify.clone())
502                .build(),
503        )
504        .await
505    {
506        Ok(plugin) => {
507            let _ = plugin_instances.insert(name, plugin);
508        }
509        Err(err) => errors.push(ConfigurationError::PluginConfiguration {
510            plugin: name,
511            error: err.to_string(),
512        }),
513    }
514}
515
516pub(crate) async fn create_plugins(
517    configuration: &Configuration,
518    schema: &Schema,
519    subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
520    initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
521    extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
522) -> Result<Plugins, BoxError> {
523    let supergraph_schema = Arc::new(schema.supergraph_schema().clone());
524    let supergraph_schema_id = schema.schema_id.clone().into_inner();
525    let mut apollo_plugins_config = configuration.apollo_plugins.clone().plugins;
526    let user_plugins_config = configuration.plugins.clone().plugins.unwrap_or_default();
527    let extra = extra_plugins.unwrap_or_default();
528    let plugin_registry = &*crate::plugin::PLUGINS;
529    let apollo_telemetry_plugin_mandatory = apollo_opentelemetry_initialized();
530    let mut apollo_plugin_factories: HashMap<&str, &PluginFactory> = plugin_registry
531        .iter()
532        .filter(|factory| {
533            // the name starts with apollo
534            factory.name.starts_with(APOLLO_PLUGIN_PREFIX)
535                && (
536                    // the plugin is mandatory
537                    apollo_telemetry_plugin_mandatory ||
538                    // the name isn't apollo.telemetry
539                    factory.name != "apollo.telemetry"
540                )
541        })
542        .map(|factory| (factory.name.as_str(), &**factory))
543        .collect();
544    let mut errors = Vec::new();
545    let mut plugin_instances = Plugins::default();
546
547    // Use function-like macros to avoid borrow conflicts of captures
548    macro_rules! add_plugin {
549        ($name: expr, $factory: expr, $plugin_config: expr) => {{
550            add_plugin(
551                $name,
552                $factory,
553                &$plugin_config,
554                schema.as_string().clone(),
555                supergraph_schema_id.clone(),
556                supergraph_schema.clone(),
557                subgraph_schemas.clone(),
558                schema.launch_id.clone(),
559                &configuration.notify.clone(),
560                &mut plugin_instances,
561                &mut errors,
562            )
563            .await;
564        }};
565    }
566
567    macro_rules! add_apollo_plugin {
568        ($name: literal, $opt_plugin_config: expr) => {{
569            let name = concat!("apollo.", $name);
570            let span = tracing::info_span!(concat!("plugin: ", "apollo.", $name));
571            async {
572                let factory = apollo_plugin_factories
573                    .remove(name)
574                    .unwrap_or_else(|| panic!("Apollo plugin not registered: {name}"));
575                if let Some(mut plugin_config) = $opt_plugin_config {
576                    if name == "apollo.telemetry" {
577                        // The apollo.telemetry" plugin isn't happy with empty config, so we
578                        // give it some. If any of the other mandatory plugins need special
579                        // treatment, then we'll have to perform it here.
580                        // This is *required* by the telemetry module or it will fail...
581                        inject_schema_id(&supergraph_schema_id, &mut plugin_config);
582                    }
583                    add_plugin!(name.to_string(), factory, plugin_config);
584                }
585            }
586            .instrument(span)
587            .await;
588        }};
589    }
590
591    macro_rules! add_mandatory_apollo_plugin {
592        ($name: literal) => {
593            add_apollo_plugin!(
594                $name,
595                Some(
596                    apollo_plugins_config
597                        .remove($name)
598                        .unwrap_or(Value::Object(Map::new()))
599                )
600            );
601        };
602    }
603
604    macro_rules! add_optional_apollo_plugin {
605        ($name: literal) => {
606            add_apollo_plugin!($name, apollo_plugins_config.remove($name));
607        };
608    }
609
610    macro_rules! add_user_plugins {
611        () => {
612            for (name, plugin_config) in user_plugins_config {
613                let user_span = tracing::info_span!("user_plugin", "name" = &name);
614
615                async {
616                    if let Some(factory) =
617                        plugin_registry.iter().find(|factory| factory.name == name)
618                    {
619                        add_plugin!(name, factory, plugin_config);
620                    } else {
621                        errors.push(ConfigurationError::PluginUnknown(name))
622                    }
623                }
624                .instrument(user_span)
625                .await;
626            }
627
628            plugin_instances.extend(extra);
629        };
630    }
631
632    add_mandatory_apollo_plugin!("include_subgraph_errors");
633    add_mandatory_apollo_plugin!("csrf");
634    add_mandatory_apollo_plugin!("headers");
635    if apollo_telemetry_plugin_mandatory {
636        match initial_telemetry_plugin {
637            None => {
638                add_mandatory_apollo_plugin!("telemetry");
639            }
640            Some(plugin) => {
641                let _ = plugin_instances.insert("apollo.telemetry".to_string(), plugin);
642                apollo_plugins_config.remove("apollo.telemetry");
643                apollo_plugin_factories.remove("apollo.telemetry");
644            }
645        }
646    }
647    add_mandatory_apollo_plugin!("limits");
648    add_mandatory_apollo_plugin!("traffic_shaping");
649    add_mandatory_apollo_plugin!("fleet_detector");
650    add_optional_apollo_plugin!("forbid_mutations");
651    add_optional_apollo_plugin!("subscription");
652    add_optional_apollo_plugin!("override_subgraph_url");
653    add_optional_apollo_plugin!("authorization");
654    add_optional_apollo_plugin!("authentication");
655    add_optional_apollo_plugin!("preview_file_uploads");
656    add_optional_apollo_plugin!("preview_entity_cache");
657    add_mandatory_apollo_plugin!("progressive_override");
658    add_optional_apollo_plugin!("demand_control");
659
660    // This relative ordering is documented in `docs/source/customizations/native.mdx`:
661    add_optional_apollo_plugin!("rhai");
662    add_optional_apollo_plugin!("coprocessor");
663    add_user_plugins!();
664
665    // Because this plugin intercepts subgraph requests
666    // and does not forward them to the next service in the chain,
667    // it needs to intervene after user plugins for users plugins to run at all.
668    add_optional_apollo_plugin!("experimental_mock_subgraphs");
669
670    // Macros above remove from `apollo_plugin_factories`, so anything left at the end
671    // indicates a missing macro call.
672    let unused_apollo_plugin_names = apollo_plugin_factories.keys().copied().collect::<Vec<_>>();
673    if !unused_apollo_plugin_names.is_empty() {
674        panic!(
675            "Apollo plugins without their ordering specified in `fn create_plugins`: {}",
676            unused_apollo_plugin_names.join(", ")
677        )
678    }
679
680    let plugin_details = plugin_instances
681        .iter()
682        .map(|(name, plugin)| (name, plugin.name()))
683        .collect::<Vec<(&String, &str)>>();
684    tracing::debug!(
685        "plugins list: {:?}",
686        plugin_details
687            .iter()
688            .map(|(name, _)| name)
689            .collect::<Vec<&&String>>()
690    );
691
692    if !errors.is_empty() {
693        for error in &errors {
694            tracing::error!("{:#}", error);
695        }
696
697        Err(BoxError::from(format!(
698            "there were {} configuration errors",
699            errors.len()
700        )))
701    } else {
702        Ok(plugin_instances)
703    }
704}
705
706fn inject_schema_id(
707    // Ideally we'd use &SchemaHash, but we'll need to update a bunch of tests to do so
708    schema_id: &str,
709    configuration: &mut Value,
710) {
711    if configuration.get("apollo").is_none() {
712        // Warning: this must be done here, otherwise studio reporting will not work
713        if apollo_key().is_some() && apollo_graph_reference().is_some() {
714            if let Some(telemetry) = configuration.as_object_mut() {
715                telemetry.insert("apollo".to_string(), Value::Object(Default::default()));
716            }
717        } else {
718            return;
719        }
720    }
721    if let Some(apollo) = configuration.get_mut("apollo") {
722        if let Some(apollo) = apollo.as_object_mut() {
723            apollo.insert(
724                "schema_id".to_string(),
725                Value::String(schema_id.to_string()),
726            );
727        }
728    }
729}
730
731#[cfg(test)]
732mod test {
733    use std::sync::Arc;
734
735    use schemars::JsonSchema;
736    use serde::Deserialize;
737    use serde_json::json;
738    use tower_http::BoxError;
739
740    use crate::configuration::Configuration;
741    use crate::plugin::Plugin;
742    use crate::plugin::PluginInit;
743    use crate::register_plugin;
744    use crate::router_factory::RouterSuperServiceFactory;
745    use crate::router_factory::YamlRouterFactory;
746    use crate::router_factory::inject_schema_id;
747    use crate::spec::Schema;
748
749    // Always starts and stops plugin
750
751    #[derive(Debug)]
752    struct AlwaysStartsAndStopsPlugin {}
753
754    /// Configuration for the test plugin
755    #[derive(Debug, Default, Deserialize, JsonSchema)]
756    struct Conf {
757        /// The name of the test
758        name: String,
759    }
760
761    #[async_trait::async_trait]
762    impl Plugin for AlwaysStartsAndStopsPlugin {
763        type Config = Conf;
764
765        async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
766            tracing::debug!("{}", init.config.name);
767            Ok(AlwaysStartsAndStopsPlugin {})
768        }
769    }
770
771    register_plugin!(
772        "test",
773        "always_starts_and_stops",
774        AlwaysStartsAndStopsPlugin
775    );
776
777    // Always fails to start plugin
778
779    #[derive(Debug)]
780    struct AlwaysFailsToStartPlugin {}
781
782    #[async_trait::async_trait]
783    impl Plugin for AlwaysFailsToStartPlugin {
784        type Config = Conf;
785
786        async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
787            tracing::debug!("{}", init.config.name);
788            Err(BoxError::from("Error"))
789        }
790    }
791
792    register_plugin!("test", "always_fails_to_start", AlwaysFailsToStartPlugin);
793
794    #[tokio::test]
795    async fn test_yaml_no_extras() {
796        let config = Configuration::builder().build().unwrap();
797        let service = create_service(config).await;
798        assert!(service.is_ok())
799    }
800
801    #[tokio::test]
802    async fn test_yaml_plugins_always_starts_and_stops() {
803        let config: Configuration = serde_yaml::from_str(
804            r#"
805            plugins:
806                test.always_starts_and_stops:
807                    name: albert
808        "#,
809        )
810        .unwrap();
811        let service = create_service(config).await;
812        assert!(service.is_ok())
813    }
814
815    #[tokio::test]
816    async fn test_yaml_plugins_always_fails_to_start() {
817        let config: Configuration = serde_yaml::from_str(
818            r#"
819            plugins:
820                test.always_fails_to_start:
821                    name: albert
822        "#,
823        )
824        .unwrap();
825        let service = create_service(config).await;
826        assert!(service.is_err())
827    }
828
829    #[tokio::test]
830    async fn test_yaml_plugins_combo_start_and_fail() {
831        let config: Configuration = serde_yaml::from_str(
832            r#"
833            plugins:
834                test.always_starts_and_stops:
835                    name: albert
836                test.always_fails_to_start:
837                    name: albert
838        "#,
839        )
840        .unwrap();
841        let service = create_service(config).await;
842        assert!(service.is_err())
843    }
844
845    async fn create_service(config: Configuration) -> Result<(), BoxError> {
846        let schema = include_str!("testdata/supergraph.graphql");
847        let schema = Schema::parse(schema, &config)?;
848
849        let is_telemetry_disabled = false;
850        let service = YamlRouterFactory
851            .create(
852                is_telemetry_disabled,
853                Arc::new(config),
854                Arc::new(schema),
855                None,
856                None,
857            )
858            .await;
859        service.map(|_| ())
860    }
861
862    #[test]
863    fn test_inject_schema_id() {
864        let mut config = json!({ "apollo": {} });
865        inject_schema_id(
866            "8e2021d131b23684671c3b85f82dfca836908c6a541bbd5c3772c66e7f8429d8",
867            &mut config,
868        );
869        let config =
870            serde_json::from_value::<crate::plugins::telemetry::config::Conf>(config).unwrap();
871        assert_eq!(
872            &config.apollo.schema_id,
873            "8e2021d131b23684671c3b85f82dfca836908c6a541bbd5c3772c66e7f8429d8"
874        );
875    }
876}