use std::collections::HashMap;
use std::collections::HashSet;
use std::io;
use std::sync::Arc;
use apollo_compiler::validation::Valid;
use axum::response::IntoResponse;
use http::StatusCode;
use indexmap::IndexMap;
use multimap::MultiMap;
use rustls::RootCertStore;
use rustls::pki_types::CertificateDer;
use serde_json::Map;
use serde_json::Value;
use tower::BoxError;
use tower::ServiceExt;
use tower::service_fn;
use tower_service::Service;
use tracing::Instrument;
use crate::AllowedFeature;
use crate::ListenAddr;
use crate::configuration::APOLLO_PLUGIN_PREFIX;
use crate::configuration::Configuration;
use crate::configuration::ConfigurationError;
use crate::configuration::TlsClient;
use crate::plugin::DynPlugin;
use crate::plugin::Handler;
use crate::plugin::PluginFactory;
use crate::plugin::PluginInit;
use crate::plugins::subscription::notification::Notify;
use crate::plugins::telemetry::reload::otel::apollo_opentelemetry_initialized;
use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING;
use crate::plugins::traffic_shaping::TrafficShaping;
use crate::query_planner::QueryPlannerService;
use crate::services::HasSchema;
use crate::services::PluggableSupergraphServiceBuilder;
use crate::services::Plugins;
use crate::services::SubgraphService;
use crate::services::SupergraphCreator;
use crate::services::apollo_graph_reference;
use crate::services::apollo_key;
use crate::services::http::HttpClientServiceFactory;
use crate::services::layers::persisted_queries::PersistedQueryLayer;
use crate::services::layers::query_analysis::QueryAnalysisLayer;
use crate::services::new_service::ServiceFactory;
use crate::services::router;
use crate::services::router::pipeline_handle::PipelineRef;
use crate::services::router::service::RouterCreator;
use crate::spec::Schema;
use crate::uplink::license_enforcement::LicenseState;
pub(crate) const STARTING_SPAN_NAME: &str = "starting";
#[derive(Clone)]
pub struct Endpoint {
pub(crate) path: String,
handler: EndpointHandler,
}
#[derive(Clone)]
enum EndpointHandler {
Service(Handler),
Router(axum::Router),
}
impl std::fmt::Debug for Endpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Endpoint")
.field("path", &self.path)
.finish()
}
}
impl Endpoint {
pub fn from_router_service(path: String, handler: router::BoxService) -> Self {
Self {
path,
handler: EndpointHandler::Service(Handler::new(handler)),
}
}
pub(crate) fn from_router(path: String, router: axum::Router) -> Self {
Self {
path,
handler: EndpointHandler::Router(router),
}
}
pub(crate) fn into_router(self) -> axum::Router {
match self.handler {
EndpointHandler::Router(router) => axum::Router::new().nest(&self.path, router),
EndpointHandler::Service(handler) => {
let handler_clone = handler.clone();
let handler = move |req: http::Request<axum::body::Body>| {
let endpoint = handler_clone.clone();
async move {
Ok(endpoint
.oneshot(req.into())
.await
.map(|res| res.response)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
.into_response())
}
};
axum::Router::new().route_service(self.path.as_str(), service_fn(handler))
}
}
}
}
pub(crate) trait RouterFactory:
ServiceFactory<router::Request, Service = Self::RouterService> + Clone + Send + Sync + 'static
{
type RouterService: Service<
router::Request,
Response = router::Response,
Error = BoxError,
Future = Self::Future,
> + Send;
type Future: Send;
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint>;
fn pipeline_ref(&self) -> Arc<PipelineRef>;
}
#[async_trait::async_trait]
pub(crate) trait RouterSuperServiceFactory: Send + Sync + 'static {
type RouterFactory: RouterFactory;
async fn create<'a>(
&'a mut self,
is_telemetry_disabled: bool,
configuration: Arc<Configuration>,
schema: Arc<Schema>,
previous_router: Option<&'a Self::RouterFactory>,
extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
license: Arc<LicenseState>,
) -> Result<Self::RouterFactory, BoxError>;
}
#[derive(Default)]
pub(crate) struct YamlRouterFactory;
#[async_trait::async_trait]
impl RouterSuperServiceFactory for YamlRouterFactory {
type RouterFactory = RouterCreator;
async fn create<'a>(
&'a mut self,
_is_telemetry_disabled: bool,
configuration: Arc<Configuration>,
schema: Arc<Schema>,
previous_router: Option<&'a Self::RouterFactory>,
extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
license: Arc<LicenseState>,
) -> Result<Self::RouterFactory, BoxError> {
let plugin_registry = &*crate::plugin::PLUGINS;
let mut initial_telemetry_plugin = None;
if previous_router.is_none()
&& apollo_opentelemetry_initialized()
&& let Some(factory) = plugin_registry
.iter()
.find(|factory| factory.name == "apollo.telemetry")
{
let mut telemetry_config = configuration
.apollo_plugins
.plugins
.get("telemetry")
.cloned();
if let Some(plugin_config) = &mut telemetry_config {
inject_schema_id(schema.schema_id.as_str(), plugin_config);
let previous_telemetry_config = previous_router.and_then(|router| {
router
.configuration
.apollo_plugins
.plugins
.get("telemetry")
.cloned()
});
let telemetry_init = PluginInit::builder()
.config(plugin_config.clone())
.and_previous_config(previous_telemetry_config)
.supergraph_sdl(schema.raw_sdl.clone())
.supergraph_schema_id(schema.schema_id.clone().into_inner())
.supergraph_schema(Arc::new(schema.supergraph_schema().clone()))
.notify(configuration.notify.clone())
.license(license.clone())
.full_config(configuration.validated_yaml.clone())
.and_original_config_yaml(configuration.raw_yaml.clone())
.build();
match factory.create_instance(telemetry_init).await {
Ok(plugin) => {
if let Some(telemetry) = plugin
.as_any()
.downcast_ref::<crate::plugins::telemetry::Telemetry>()
{
telemetry.activate();
}
initial_telemetry_plugin = Some(plugin);
}
Err(e) => return Err(e),
}
}
}
let router_span = tracing::info_span!(STARTING_SPAN_NAME);
Self.inner_create(
configuration,
schema,
previous_router,
initial_telemetry_plugin,
extra_plugins,
license,
)
.instrument(router_span)
.await
}
}
impl YamlRouterFactory {
async fn inner_create<'a>(
&'a mut self,
configuration: Arc<Configuration>,
schema: Arc<Schema>,
previous_router: Option<&'a RouterCreator>,
initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
license: Arc<LicenseState>,
) -> Result<RouterCreator, BoxError> {
let mut supergraph_creator = self
.inner_create_supergraph(
configuration.clone(),
schema,
initial_telemetry_plugin,
extra_plugins,
license,
previous_router,
)
.await?;
let query_analysis_layer =
QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&configuration)).await;
let persisted_query_layer = Arc::new(PersistedQueryLayer::new(&configuration).await?);
if let Some(previous_router) = previous_router {
let previous_cache = previous_router.previous_cache();
supergraph_creator
.warm_up_query_planner(
&query_analysis_layer,
&persisted_query_layer,
Some(previous_cache),
configuration.supergraph.query_planning.warmed_up_queries,
configuration
.supergraph
.query_planning
.experimental_reuse_query_plans,
&configuration
.persisted_queries
.experimental_prewarm_query_plan_cache,
)
.await;
} else {
supergraph_creator
.warm_up_query_planner(
&query_analysis_layer,
&persisted_query_layer,
None,
configuration.supergraph.query_planning.warmed_up_queries,
configuration
.supergraph
.query_planning
.experimental_reuse_query_plans,
&configuration
.persisted_queries
.experimental_prewarm_query_plan_cache,
)
.await;
};
RouterCreator::new(
query_analysis_layer,
persisted_query_layer,
Arc::new(supergraph_creator),
configuration,
)
.await
}
pub(crate) async fn inner_create_supergraph(
&mut self,
configuration: Arc<Configuration>,
schema: Arc<Schema>,
initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
license: Arc<LicenseState>,
previous_router: Option<&crate::services::router::service::RouterCreator>,
) -> Result<SupergraphCreator, BoxError> {
let query_planner_span = tracing::info_span!("query_planner_creation");
let planner = QueryPlannerService::new(schema.clone(), configuration.clone())
.instrument(query_planner_span)
.await?;
let span = tracing::info_span!("plugins");
let subgraph_schemas = Arc::new(
planner
.subgraph_schemas()
.iter()
.map(|(k, v)| (k.clone(), v.schema.clone()))
.collect(),
);
let plugins: Arc<Plugins> = Arc::new(
create_plugins(
&configuration,
&schema,
subgraph_schemas,
initial_telemetry_plugin,
extra_plugins,
license,
previous_router,
)
.instrument(span)
.await?
.into_iter()
.collect(),
);
async {
let mut builder = PluggableSupergraphServiceBuilder::new(planner);
builder = builder.with_configuration(configuration.clone());
let http_service_factory =
create_http_services(&plugins, &schema, &configuration).await?;
let subgraph_services =
create_subgraph_services(&http_service_factory, &configuration).await?;
builder = builder.with_http_service_factory(http_service_factory);
for (name, subgraph_service) in subgraph_services {
builder = builder.with_subgraph_service(&name, subgraph_service);
}
let supergraph_creator = builder.with_plugins(plugins).build().await?;
Ok(supergraph_creator)
}
.instrument(tracing::info_span!("supergraph_creation"))
.await
}
}
pub(crate) async fn create_subgraph_services(
http_service_factory: &IndexMap<String, HttpClientServiceFactory>,
configuration: &Configuration,
) -> Result<IndexMap<String, SubgraphService>, BoxError> {
let mut subgraph_services = IndexMap::default();
for (name, http_service_factory) in http_service_factory.iter() {
let subgraph_service = SubgraphService::from_config(
name.clone(),
configuration,
http_service_factory.clone(),
)?;
subgraph_services.insert(name.clone(), subgraph_service);
}
Ok(subgraph_services)
}
pub(crate) async fn create_http_services(
plugins: &Arc<Plugins>,
schema: &Schema,
configuration: &Configuration,
) -> Result<IndexMap<String, HttpClientServiceFactory>, BoxError> {
let subgraph_tls_root_store: RootCertStore = configuration
.tls
.subgraph
.all
.create_certificate_store()
.transpose()?
.unwrap_or_else(crate::services::http::HttpClientService::native_roots_store);
let connector_tls_root_store: RootCertStore = configuration
.tls
.connector
.all
.create_certificate_store()
.transpose()?
.unwrap_or_else(crate::services::http::HttpClientService::native_roots_store);
let shaping = plugins
.iter()
.find(|i| i.0.as_str() == APOLLO_TRAFFIC_SHAPING)
.and_then(|plugin| (*plugin.1).as_any().downcast_ref::<TrafficShaping>())
.expect("traffic shaping should always be part of the plugin list");
let connector_subgraphs: HashSet<String> = schema
.connectors
.as_ref()
.map(|c| {
c.by_service_name
.iter()
.map(|(_, connector)| connector.id.subgraph_name.clone())
.collect()
})
.unwrap_or_default();
let mut http_services = IndexMap::new();
for (name, _) in schema.subgraphs() {
if connector_subgraphs.contains(name) {
continue; }
let http_service = crate::services::http::HttpClientService::from_config_for_subgraph(
name,
configuration,
&subgraph_tls_root_store,
shaping.subgraph_client_config(name),
)?;
let http_service_factory = HttpClientServiceFactory::new(http_service, plugins.clone());
http_services.insert(name.clone(), http_service_factory);
}
let connector_sources = schema
.connectors
.as_ref()
.map(|c| c.source_config_keys.clone())
.unwrap_or_default();
for name in connector_sources.iter() {
let http_service = crate::services::http::HttpClientService::from_config_for_connector(
name,
configuration,
&connector_tls_root_store,
shaping.connector_client_config(name),
)?;
let http_service_factory = HttpClientServiceFactory::new(http_service, plugins.clone());
http_services.insert(name.clone(), http_service_factory);
}
Ok(http_services)
}
impl TlsClient {
pub(crate) fn create_certificate_store(
&self,
) -> Option<Result<RootCertStore, ConfigurationError>> {
self.certificate_authorities
.as_deref()
.map(create_certificate_store)
}
}
pub(crate) fn create_certificate_store(
certificate_authorities: &str,
) -> Result<RootCertStore, ConfigurationError> {
let mut store = RootCertStore::empty();
let certificates = load_certs(certificate_authorities).map_err(|e| {
ConfigurationError::CertificateAuthorities {
error: format!("could not parse the certificate list: {e}"),
}
})?;
for certificate in certificates {
store
.add(certificate)
.map_err(|e| ConfigurationError::CertificateAuthorities {
error: format!("could not add certificate to root store: {e}"),
})?;
}
if store.is_empty() {
Err(ConfigurationError::CertificateAuthorities {
error: "the certificate list is empty".to_string(),
})
} else {
Ok(store)
}
}
fn load_certs(certificates: &str) -> io::Result<Vec<CertificateDer<'static>>> {
tracing::debug!("loading root certificates");
rustls_pemfile::certs(&mut certificates.as_bytes())
.collect::<Result<Vec<_>, _>>()
.map_err(|_| io::Error::other("failed to load certificate"))
}
pub async fn create_test_service_factory_from_yaml(schema: &str, configuration: &str) {
let config: Configuration = serde_yaml::from_str(configuration).unwrap();
let schema = Arc::new(Schema::parse(schema, &config).unwrap());
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(config),
schema,
None,
None,
Default::default(),
)
.await;
assert_eq!(
service.map(|_| ()).unwrap_err().to_string().as_str(),
r#"failed to initialize the query planner: An internal error has occurred, please report this bug to Apollo.
Details: Object field "Product.reviews"'s inner type "Review" does not refer to an existing output type."#
);
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn add_plugin(
name: String,
factory: &PluginFactory,
plugin_config: &Value,
previous_plugin_config: Option<&Value>,
schema: Arc<String>,
schema_id: Arc<String>,
supergraph_schema: Arc<Valid<apollo_compiler::Schema>>,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
launch_id: Option<Arc<String>>,
notify: &Notify<String, crate::graphql::Response>,
plugin_instances: &mut Plugins,
errors: &mut Vec<ConfigurationError>,
license: Arc<LicenseState>,
full_config: Option<Value>,
original_config_yaml: Option<Arc<str>>,
) {
let plugin_init = PluginInit::builder()
.config(plugin_config.clone())
.and_previous_config(previous_plugin_config.cloned())
.supergraph_sdl(schema)
.supergraph_schema_id(schema_id)
.supergraph_schema(supergraph_schema)
.subgraph_schemas(subgraph_schemas)
.launch_id(launch_id)
.notify(notify.clone())
.license(license)
.and_full_config(full_config)
.and_original_config_yaml(original_config_yaml)
.build();
match factory.create_instance(plugin_init).await {
Ok(plugin) => {
let _ = plugin_instances.insert(name, plugin);
}
Err(err) => errors.push(ConfigurationError::PluginConfiguration {
plugin: name,
error: err.to_string(),
}),
}
}
pub(crate) async fn create_plugins(
configuration: &Configuration,
schema: &Schema,
subgraph_schemas: Arc<HashMap<String, Arc<Valid<apollo_compiler::Schema>>>>,
initial_telemetry_plugin: Option<Box<dyn DynPlugin>>,
extra_plugins: Option<Vec<(String, Box<dyn DynPlugin>)>>,
license: Arc<LicenseState>,
previous_router: Option<&crate::services::router::service::RouterCreator>,
) -> Result<Plugins, BoxError> {
let supergraph_schema = Arc::new(schema.supergraph_schema().clone());
let supergraph_schema_id = schema.schema_id.clone().into_inner();
let mut apollo_plugins_config = configuration.apollo_plugins.clone().plugins;
let user_plugins_config = configuration.plugins.clone().plugins.unwrap_or_default();
let (previous_apollo_plugins_config, previous_user_plugins_config) = match previous_router {
Some(router) => {
let prev_apollo_configs: HashMap<&str, &Value> = router
.configuration
.apollo_plugins
.plugins
.iter()
.map(|(k, v)| (k.as_str(), v))
.collect();
let prev_user_configs: HashMap<String, &Value> = router
.configuration
.plugins
.plugins
.as_ref()
.map(|plugins| plugins.iter().map(|(k, v)| (k.clone(), v)).collect())
.unwrap_or_default();
(prev_apollo_configs, prev_user_configs)
}
None => (HashMap::new(), HashMap::new()),
};
let extra = extra_plugins.unwrap_or_default();
let plugin_registry = &*crate::plugin::PLUGINS;
let apollo_telemetry_plugin_mandatory = apollo_opentelemetry_initialized();
let mut apollo_plugin_factories: HashMap<&str, &PluginFactory> = plugin_registry
.iter()
.filter(|factory| {
factory.name.starts_with(APOLLO_PLUGIN_PREFIX)
&& (
apollo_telemetry_plugin_mandatory ||
factory.name != "apollo.telemetry"
)
})
.map(|factory| (factory.name.as_str(), &**factory))
.collect();
let mut errors = Vec::new();
let mut plugin_instances = Plugins::default();
macro_rules! add_plugin {
($name: expr, $factory: expr, $plugin_config: expr, $maybe_full_config: expr, $previous_plugin_config: expr) => {{
add_plugin(
$name,
$factory,
&$plugin_config,
$previous_plugin_config,
schema.as_string().clone(),
supergraph_schema_id.clone(),
supergraph_schema.clone(),
subgraph_schemas.clone(),
schema.launch_id.clone(),
&configuration.notify.clone(),
&mut plugin_instances,
&mut errors,
license.clone(),
$maybe_full_config,
configuration.raw_yaml.clone(),
)
.await;
}};
}
macro_rules! add_mandatory_apollo_plugin_inner {
($name: literal, $opt_plugin_config: expr) => {{
let name = concat!("apollo.", $name);
let span = tracing::info_span!(concat!("plugin: ", "apollo.", $name));
async {
let factory = apollo_plugin_factories
.remove(name)
.unwrap_or_else(|| panic!("Apollo plugin not registered: {name}"));
if let Some(mut plugin_config) = $opt_plugin_config {
let mut full_config = None;
if name == "apollo.telemetry" {
inject_schema_id(&supergraph_schema_id, &mut plugin_config);
full_config = configuration.validated_yaml.clone();
}
let previous_config = previous_apollo_plugins_config.get($name).copied();
add_plugin!(
name.to_string(),
factory,
plugin_config,
full_config,
previous_config
);
}
}
.instrument(span)
.await;
}};
}
macro_rules! add_optional_apollo_plugin_inner {
($name: literal, $opt_plugin_config: expr, $license: expr) => {{
let name = concat!("apollo.", $name);
let span = tracing::info_span!(concat!("plugin: ", "apollo.", $name));
async {
let factory = apollo_plugin_factories
.remove(name)
.unwrap_or_else(|| panic!("Apollo plugin not registered: {name}"));
if let Some(plugin_config) = $opt_plugin_config {
let allowed_features = $license.get_allowed_features();
match AllowedFeature::from_plugin_name($name) {
Some(allowed_feature) => {
if allowed_features.contains(&allowed_feature) {
let previous_config = previous_apollo_plugins_config.get($name).copied();
add_plugin!(name.to_string(), factory, plugin_config, None, previous_config);
} else {
tracing::warn!(
"{name} plugin is not registered, {name} is a restricted feature that requires a license"
);
}
}
None => {
let previous_config = previous_apollo_plugins_config.get($name).copied();
add_plugin!(name.to_string(), factory, plugin_config, None, previous_config);
}
}
}
}
.instrument(span)
.await;
}};
}
macro_rules! add_oss_apollo_plugin_inner {
($name: literal, $opt_plugin_config: expr) => {{
let name = concat!("apollo.", $name);
let span = tracing::info_span!(concat!("plugin: ", "apollo.", $name));
async {
let factory = apollo_plugin_factories
.remove(name)
.unwrap_or_else(|| panic!("Apollo plugin not registered: {name}"));
if let Some(plugin_config) = $opt_plugin_config {
let previous_config = previous_apollo_plugins_config.get($name).copied();
add_plugin!(
name.to_string(),
factory,
plugin_config,
None,
previous_config
);
return;
}
}
.instrument(span)
.await;
}};
}
macro_rules! add_mandatory_apollo_plugin {
($name: literal) => {
add_mandatory_apollo_plugin_inner!(
$name,
Some(
apollo_plugins_config
.remove($name)
.unwrap_or(Value::Object(Map::new()))
)
);
};
}
macro_rules! add_optional_apollo_plugin {
($name: literal) => {
add_optional_apollo_plugin_inner!($name, apollo_plugins_config.remove($name), &license);
};
}
macro_rules! add_oss_apollo_plugin {
($name: literal) => {
add_oss_apollo_plugin_inner!($name, apollo_plugins_config.remove($name));
};
}
macro_rules! add_user_plugins {
() => {
for (name, plugin_config) in user_plugins_config {
let user_span = tracing::info_span!("user_plugin", "name" = &name);
async {
if let Some(factory) =
plugin_registry.iter().find(|factory| factory.name == name)
{
let previous_config = previous_user_plugins_config.get(&name).copied();
add_plugin!(name, factory, plugin_config, None, previous_config);
} else {
errors.push(ConfigurationError::PluginUnknown(name))
}
}
.instrument(user_span)
.await;
}
plugin_instances.extend(extra);
};
}
add_mandatory_apollo_plugin!("include_subgraph_errors");
add_mandatory_apollo_plugin!("headers");
if apollo_telemetry_plugin_mandatory {
match initial_telemetry_plugin {
None => {
add_mandatory_apollo_plugin!("telemetry");
}
Some(plugin) => {
let _ = plugin_instances.insert("apollo.telemetry".to_string(), plugin);
apollo_plugins_config.remove("apollo.telemetry");
apollo_plugin_factories.remove("apollo.telemetry");
}
}
}
add_mandatory_apollo_plugin!("license_enforcement");
add_mandatory_apollo_plugin!("health_check");
add_mandatory_apollo_plugin!("traffic_shaping");
add_mandatory_apollo_plugin!("limits");
add_mandatory_apollo_plugin!("csrf");
add_mandatory_apollo_plugin!("fleet_detector");
add_mandatory_apollo_plugin!("enhanced_client_awareness");
add_mandatory_apollo_plugin!("experimental_diagnostics");
add_oss_apollo_plugin!("forbid_mutations");
add_optional_apollo_plugin!("subscription");
add_oss_apollo_plugin!("override_subgraph_url");
add_optional_apollo_plugin!("authorization");
add_optional_apollo_plugin!("authentication");
add_oss_apollo_plugin!("preview_file_uploads");
add_optional_apollo_plugin!("preview_entity_cache");
add_mandatory_apollo_plugin!("progressive_override");
add_optional_apollo_plugin!("demand_control");
add_oss_apollo_plugin!("connectors");
add_oss_apollo_plugin!("rhai");
add_optional_apollo_plugin!("coprocessor");
add_optional_apollo_plugin!("response_cache");
add_user_plugins!();
add_optional_apollo_plugin!("experimental_mock_subgraphs");
let unused_apollo_plugin_names = apollo_plugin_factories.keys().copied().collect::<Vec<_>>();
if !unused_apollo_plugin_names.is_empty() {
panic!(
"Apollo plugins without their ordering specified in `fn create_plugins`: {}",
unused_apollo_plugin_names.join(", ")
)
}
let plugin_details = plugin_instances
.iter()
.map(|(name, plugin)| (name, plugin.name()))
.collect::<Vec<(&String, &str)>>();
tracing::debug!(
"plugins list: {:?}",
plugin_details
.iter()
.map(|(name, _)| name)
.collect::<Vec<&&String>>()
);
if !errors.is_empty() {
for error in &errors {
tracing::error!("{:#}", error);
}
let errors_list = errors
.iter()
.map(ToString::to_string)
.collect::<Vec<String>>()
.join("\n");
Err(BoxError::from(format!(
"there were {} configuration errors\n{}",
errors.len(),
errors_list
)))
} else {
Ok(plugin_instances)
}
}
fn inject_schema_id(
schema_id: &str,
configuration: &mut Value,
) {
if configuration.get("apollo").is_none() {
if apollo_key().is_some() && apollo_graph_reference().is_some() {
if let Some(telemetry) = configuration.as_object_mut() {
telemetry.insert("apollo".to_string(), Value::Object(Default::default()));
}
} else {
return;
}
}
if let Some(apollo) = configuration.get_mut("apollo")
&& let Some(apollo) = apollo.as_object_mut()
{
apollo.insert(
"schema_id".to_string(),
Value::String(schema_id.to_string()),
);
}
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::sync::Arc;
use rstest::rstest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::json;
use tower_http::BoxError;
use crate::AllowedFeature;
use crate::configuration::Configuration;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::register_plugin;
use crate::router_factory::RouterSuperServiceFactory;
use crate::router_factory::YamlRouterFactory;
use crate::router_factory::inject_schema_id;
use crate::services::supergraph::service::HasPlugins;
use crate::spec::Schema;
use crate::uplink::license_enforcement::LicenseLimits;
use crate::uplink::license_enforcement::LicenseState;
const MANDATORY_PLUGINS: &[&str] = &[
"apollo.include_subgraph_errors",
"apollo.headers",
"apollo.license_enforcement",
"apollo.health_check",
"apollo.traffic_shaping",
"apollo.limits",
"apollo.csrf",
"apollo.fleet_detector",
"apollo.enhanced_client_awareness",
"apollo.progressive_override",
];
const OSS_PLUGINS: &[&str] = &[
"apollo.forbid_mutations",
"apollo.override_subgraph_url",
"apollo.connectors",
];
#[derive(Debug)]
struct AlwaysStartsAndStopsPlugin {}
#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Conf {
name: String,
}
#[async_trait::async_trait]
impl Plugin for AlwaysStartsAndStopsPlugin {
type Config = Conf;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
tracing::debug!("{}", init.config.name);
Ok(AlwaysStartsAndStopsPlugin {})
}
}
register_plugin!(
"test",
"always_starts_and_stops",
AlwaysStartsAndStopsPlugin
);
#[derive(Debug)]
struct AlwaysFailsToStartPlugin {}
#[async_trait::async_trait]
impl Plugin for AlwaysFailsToStartPlugin {
type Config = Conf;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
tracing::debug!("{}", init.config.name);
Err(BoxError::from("Error"))
}
}
register_plugin!("test", "always_fails_to_start", AlwaysFailsToStartPlugin);
async fn create_service(config: Configuration) -> Result<(), BoxError> {
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &config)?;
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(config),
Arc::new(schema),
None,
None,
Arc::new(LicenseState::default()),
)
.await;
service.map(|_| ())
}
#[tokio::test]
async fn test_yaml_no_extras() {
let config = Configuration::builder().build().unwrap();
let service = create_service(config).await;
assert!(service.is_ok())
}
#[tokio::test]
async fn test_yaml_plugins_always_starts_and_stops() {
let config: Configuration = serde_yaml::from_str(
r#"
plugins:
test.always_starts_and_stops:
name: albert
"#,
)
.unwrap();
let service = create_service(config).await;
assert!(service.is_ok())
}
#[tokio::test]
async fn test_yaml_plugins_always_fails_to_start() {
let config: Configuration = serde_yaml::from_str(
r#"
plugins:
test.always_fails_to_start:
name: albert
"#,
)
.unwrap();
let service = create_service(config).await;
assert!(service.is_err())
}
#[tokio::test]
async fn test_yaml_plugins_combo_start_and_fail() {
let config: Configuration = serde_yaml::from_str(
r#"
plugins:
test.always_starts_and_stops:
name: albert
test.always_fails_to_start:
name: albert
"#,
)
.unwrap();
let service = create_service(config).await;
assert!(service.is_err())
}
#[test]
fn test_inject_schema_id() {
let mut config = json!({ "apollo": {} });
inject_schema_id(
"8e2021d131b23684671c3b85f82dfca836908c6a541bbd5c3772c66e7f8429d8",
&mut config,
);
let config =
serde_json::from_value::<crate::plugins::telemetry::config::Conf>(config).unwrap();
assert_eq!(
&config.apollo.schema_id,
"8e2021d131b23684671c3b85f82dfca836908c6a541bbd5c3772c66e7f8429d8"
);
}
fn get_plugin_config(plugin: &str) -> &str {
match plugin {
"subscription" => {
r#"
enabled: true
"#
}
"authentication" => {
r#"
connector:
sources: {}
"#
}
"authorization" => {
r#"
require_authentication: false
"#
}
"preview_file_uploads" => {
r#"
enabled: true
protocols:
multipart:
enabled: false
"#
}
"preview_entity_cache" => {
r#"
enabled: true
subgraph:
all:
enabled: true
"#
}
"response_cache" => {
r#"
enabled: true
subgraph:
all:
enabled: true
"#
}
"demand_control" => {
r#"
enabled: true
mode: measure
strategy:
static_estimated:
list_size: 0
max: 0.0
"#
}
"coprocessor" => {
r#"
url: http://service.example.com/url
"#
}
"connectors" => {
r#"
debug_extensions: false
"#
}
"experimental_mock_subgraphs" => {
r#"
subgraphs: {}
"#
}
"forbid_mutations" => {
r#"
false
"#
}
"override_subgraph_url" => {
r#"
{}
"#
}
_ => panic!("This function does not contain config for plugin: {plugin}"),
}
}
#[tokio::test]
#[rstest]
#[case::empty_allowed_features_set(HashSet::new())]
#[case::nonempty_allowed_features_set(HashSet::from_iter(vec![AllowedFeature::Coprocessors]))]
async fn test_mandatory_plugins_added(#[case] allowed_features: HashSet<AllowedFeature>) {
let license = LicenseState::Licensed {
limits: Some(LicenseLimits {
tps: None,
allowed_features,
}),
};
let router_config = Configuration::builder().build().unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
#[tokio::test]
#[rstest]
#[case::allowed_features_empty(HashSet::new())]
#[case::allowed_features_nonempty(HashSet::from_iter(vec![
AllowedFeature::Coprocessors,
AllowedFeature::DemandControl
]))]
async fn test_oss_plugins_added(#[case] allowed_features: HashSet<AllowedFeature>) {
let license = LicenseState::Licensed {
limits: Some(LicenseLimits {
tps: None,
allowed_features,
}),
};
let forbid_mutations_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("forbid_mutations"))
.unwrap();
let override_subgraph_url_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("override_subgraph_url"))
.unwrap();
let connectors_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("connectors")).unwrap();
let router_config = Configuration::builder()
.apollo_plugin("forbid_mutations", forbid_mutations_config)
.apollo_plugin("override_subgraph_url", override_subgraph_url_config)
.apollo_plugin("connectors", connectors_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
OSS_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
#[tokio::test]
#[rstest]
#[case::subscripions(
"subscription",
HashSet::from_iter(vec![AllowedFeature::DemandControl, AllowedFeature::Subscriptions]))
]
#[case::authorization(
"authorization",
HashSet::from_iter(vec![AllowedFeature::Authorization, AllowedFeature::Subscriptions]))
]
#[case::authentication(
"authentication",
HashSet::from_iter(vec![AllowedFeature::DemandControl, AllowedFeature::Authentication, AllowedFeature::Subscriptions]))
]
#[case::entity_caching(
"preview_entity_cache",
HashSet::from_iter(vec![AllowedFeature::EntityCaching, AllowedFeature::DemandControl]))
]
#[case::response_cache(
"response_cache",
HashSet::from_iter(vec![AllowedFeature::DemandControl, AllowedFeature::ResponseCaching]))
]
#[case::authorization(
"demand_control",
HashSet::from_iter(vec![AllowedFeature::Authorization, AllowedFeature::Subscriptions, AllowedFeature::DemandControl]))
]
#[case::coprocessor(
"coprocessor",
HashSet::from_iter(vec![AllowedFeature::Coprocessors, AllowedFeature::DemandControl]))
]
async fn test_optional_plugin_added_with_restricted_allowed_features(
#[case] plugin: &str,
#[case] allowed_features: HashSet<AllowedFeature>,
) {
let license = LicenseState::Licensed {
limits: Some(LicenseLimits {
tps: None,
allowed_features,
}),
};
let plugin_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config(plugin)).unwrap();
dbg!(&plugin_config);
let router_config = Configuration::builder()
.apollo_plugin(plugin, plugin_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
service
.supergraph_creator
.plugins()
.contains_key(&format!("apollo.{plugin}")),
"Plugin {plugin} should have been added"
);
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
#[tokio::test]
#[rstest]
#[case::subscripions(
"subscription",
HashSet::from_iter(vec![]))
]
#[case::authorization(
"authorization",
HashSet::from_iter(vec![AllowedFeature::Authentication, AllowedFeature::Subscriptions]))
]
#[case::authentication(
"authentication",
HashSet::from_iter(vec![AllowedFeature::DemandControl,AllowedFeature::Subscriptions]))
]
#[case::entity_caching(
"preview_entity_cache",
HashSet::from_iter(vec![AllowedFeature::DemandControl]))
]
#[case::response_cache(
"response_cache",
HashSet::from_iter(vec![AllowedFeature::EntityCaching]))
]
#[case::authorization(
"demand_control",
HashSet::from_iter(vec![AllowedFeature::Authorization, AllowedFeature::Subscriptions, AllowedFeature::Experimental]))
]
#[case::coprocessor(
"coprocessor",
HashSet::from_iter(vec![AllowedFeature::DemandControl]))
]
async fn test_optional_plugin_not_added_with_restricted_allowed_features(
#[case] plugin: &str,
#[case] allowed_features: HashSet<AllowedFeature>,
) {
let license = LicenseState::Licensed {
limits: Some(LicenseLimits {
tps: None,
allowed_features,
}),
};
let plugin_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config(plugin)).unwrap();
let router_config = Configuration::builder()
.apollo_plugin(plugin, plugin_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
!service
.supergraph_creator
.plugins()
.contains_key(&format!("apollo.{plugin}")),
"Plugin {plugin} should not have been added"
);
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
#[tokio::test]
#[rstest]
#[case::mock_subgraphs_non_empty_allowed_features(
"experimental_mock_subgraphs",
HashSet::from_iter(vec![AllowedFeature::DemandControl])
)]
#[case::mock_subgraphs_empty_allowed_features(
"experimental_mock_subgraphs",
HashSet::from_iter(vec![])
)]
async fn test_optional_plugin_that_does_not_map_to_an_allowed_feature_is_added(
#[case] plugin: &str,
#[case] allowed_features: HashSet<AllowedFeature>,
) {
let license = LicenseState::Licensed {
limits: Some(LicenseLimits {
tps: None,
allowed_features,
}),
};
let plugin_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config(plugin)).unwrap();
let router_config = Configuration::builder()
.apollo_plugin(plugin, plugin_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
service
.supergraph_creator
.plugins()
.contains_key(&format!("apollo.{plugin}")),
"Plugin {plugin} should have been added"
);
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
assert!(
!service
.supergraph_creator
.plugins()
.contains_key("apollo.subscription"),
"Plugin {plugin} should not have been added"
);
assert!(
!service
.supergraph_creator
.plugins()
.contains_key("apollo.coprocessor"),
"Plugin {plugin} should not have been added"
);
}
#[tokio::test]
#[rstest]
#[case::forbid_mutations("forbid_mutations")]
#[case::subscriptions("subscription")]
#[case::override_subgraph_url("override_subgraph_url")]
#[case::authorization("authorization")]
#[case::authentication("authentication")]
#[case::file_upload("preview_file_uploads")]
#[case::entity_cache("preview_entity_cache")]
#[case::response_cache("response_cache")]
#[case::demand_control("demand_control")]
#[case::connectors("connectors")]
#[case::coprocessor("coprocessor")]
#[case::mock_subgraphs("experimental_mock_subgraphs")]
async fn test_optional_plugin_with_unrestricted_allowed_features(#[case] plugin: &str) {
let license = LicenseState::Licensed {
limits: Default::default(),
};
let plugin_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config(plugin)).unwrap();
let router_config = Configuration::builder()
.apollo_plugin(plugin, plugin_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
service
.supergraph_creator
.plugins()
.contains_key(&format!("apollo.{plugin}")),
"Plugin {plugin} should have been added"
);
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
#[tokio::test]
#[rstest]
#[case::forbid_mutations("forbid_mutations")]
#[case::subscriptions("subscription")]
#[case::override_subgraph_url("override_subgraph_url")]
#[case::authorization("authorization")]
#[case::authentication("authentication")]
#[case::file_upload("preview_file_uploads")]
#[case::response_cache("response_cache")]
#[case::demand_control("demand_control")]
#[case::connectors("connectors")]
#[case::coprocessor("coprocessor")]
#[case::mock_subgraphs("experimental_mock_subgraphs")]
async fn test_optional_plugin_with_default_license_limits(#[case] plugin: &str) {
let license = LicenseState::Licensed {
limits: Default::default(),
};
let plugin_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config(plugin)).unwrap();
let forbid_mutations_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("forbid_mutations"))
.unwrap();
let override_subgraph_url_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("override_subgraph_url"))
.unwrap();
let connectors_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("connectors")).unwrap();
let response_cache_config =
serde_yaml::from_str::<serde_json::Value>(get_plugin_config("response_cache")).unwrap();
let router_config = Configuration::builder()
.apollo_plugin("forbid_mutations", forbid_mutations_config)
.apollo_plugin("override_subgraph_url", override_subgraph_url_config)
.apollo_plugin("connectors", connectors_config)
.apollo_plugin("response_cache", response_cache_config)
.apollo_plugin(plugin, plugin_config)
.build()
.unwrap();
let schema = include_str!("testdata/supergraph.graphql");
let schema = Schema::parse(schema, &router_config).unwrap();
let is_telemetry_disabled = false;
let service = YamlRouterFactory
.create(
is_telemetry_disabled,
Arc::new(router_config),
Arc::new(schema),
None,
None,
Arc::new(license),
)
.await
.unwrap();
assert!(
service
.supergraph_creator
.plugins()
.contains_key(&format!("apollo.{plugin}")),
"Plugin {plugin} should have been added"
);
assert!(
MANDATORY_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
assert!(
OSS_PLUGINS
.iter()
.all(|plugin| { service.supergraph_creator.plugins().contains_key(*plugin) })
);
}
}