pub mod serde;
#[macro_use]
pub mod test;
use std::any::TypeId;
use std::collections::HashMap;
use std::fmt;
use std::path::PathBuf;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use ::serde::Deserialize;
use ::serde::de::DeserializeOwned;
use apollo_compiler::Schema;
use apollo_compiler::validation::Valid;
use async_trait::async_trait;
use futures::future::BoxFuture;
use multimap::MultiMap;
use once_cell::sync::Lazy;
use schemars::JsonSchema;
use schemars::gen::SchemaGenerator;
use tower::BoxError;
use tower::Service;
use tower::ServiceBuilder;
use tower::buffer::Buffer;
use tower::buffer::future::ResponseFuture;
use crate::ListenAddr;
use crate::graphql;
use crate::layers::ServiceBuilderExt;
use crate::notification::Notify;
use crate::router_factory::Endpoint;
use crate::services::execution;
use crate::services::router;
use crate::services::subgraph;
use crate::services::supergraph;
type InstanceFactory =
fn(PluginInit<serde_json::Value>) -> BoxFuture<'static, Result<Box<dyn DynPlugin>, BoxError>>;
type SchemaFactory = fn(&mut SchemaGenerator) -> schemars::schema::Schema;
#[linkme::distributed_slice]
pub static PLUGINS: [Lazy<PluginFactory>] = [..];
#[non_exhaustive]
pub struct PluginInit<T> {
pub config: T,
pub supergraph_sdl: Arc<String>,
pub(crate) supergraph_schema_id: Arc<String>,
pub(crate) supergraph_schema: Arc<Valid<Schema>>,
pub(crate) subgraph_schemas: Arc<HashMap<String, Arc<Valid<Schema>>>>,
pub(crate) launch_id: Option<Arc<String>>,
pub(crate) notify: Notify<String, graphql::Response>,
}
impl<T> PluginInit<T>
where
T: for<'de> Deserialize<'de>,
{
#[deprecated = "use PluginInit::builder() instead"]
pub fn new(config: T, supergraph_sdl: Arc<String>) -> Self {
Self::builder()
.config(config)
.supergraph_schema(Arc::new(
Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
.expect("failed to parse supergraph schema"),
))
.supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
.supergraph_sdl(supergraph_sdl)
.notify(Notify::builder().build())
.build()
}
#[deprecated = "use PluginInit::try_builder() instead"]
pub fn try_new(
config: serde_json::Value,
supergraph_sdl: Arc<String>,
) -> Result<Self, BoxError> {
Self::try_builder()
.config(config)
.supergraph_schema(Arc::new(
Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
.map_err(|e| {
BoxError::from(e.errors.to_string())
})?,
))
.supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
.supergraph_sdl(supergraph_sdl)
.notify(Notify::builder().build())
.build()
}
#[cfg(test)]
pub(crate) fn fake_new(config: T, supergraph_sdl: Arc<String>) -> Self {
let supergraph_schema = Arc::new(if !supergraph_sdl.is_empty() {
Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
.expect("failed to parse supergraph schema")
} else {
Valid::assume_valid(Schema::new())
});
PluginInit::fake_builder()
.config(config)
.supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
.supergraph_sdl(supergraph_sdl)
.supergraph_schema(supergraph_schema)
.launch_id(Arc::new("launch_id".to_string()))
.notify(Notify::for_tests())
.build()
}
#[doc(hidden)]
pub fn unsupported_supergraph_schema(&self) -> Arc<Valid<Schema>> {
self.supergraph_schema.clone()
}
#[doc(hidden)]
pub fn unsupported_subgraph_schemas(&self) -> Arc<HashMap<String, Arc<Valid<Schema>>>> {
self.subgraph_schemas.clone()
}
}
#[buildstructor::buildstructor]
impl<T> PluginInit<T>
where
T: for<'de> Deserialize<'de>,
{
#[builder(entry = "builder", exit = "build", visibility = "pub")]
pub(crate) fn new_builder(
config: T,
supergraph_sdl: Arc<String>,
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Option<Arc<String>>>,
notify: Notify<String, graphql::Response>,
) -> Self {
PluginInit {
config,
supergraph_sdl,
supergraph_schema_id,
supergraph_schema,
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id: launch_id.flatten(),
notify,
}
}
#[builder(entry = "try_builder", exit = "build", visibility = "pub")]
pub(crate) fn try_new_builder(
config: serde_json::Value,
supergraph_sdl: Arc<String>,
supergraph_schema_id: Arc<String>,
supergraph_schema: Arc<Valid<Schema>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Arc<String>>,
notify: Notify<String, graphql::Response>,
) -> Result<Self, BoxError> {
let config: T = serde_json::from_value(config)?;
Ok(PluginInit {
config,
supergraph_sdl,
supergraph_schema,
supergraph_schema_id,
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id,
notify,
})
}
#[builder(entry = "fake_builder", exit = "build", visibility = "pub")]
fn fake_new_builder(
config: T,
supergraph_sdl: Option<Arc<String>>,
supergraph_schema_id: Option<Arc<String>>,
supergraph_schema: Option<Arc<Valid<Schema>>>,
subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
launch_id: Option<Arc<String>>,
notify: Option<Notify<String, graphql::Response>>,
) -> Self {
PluginInit {
config,
supergraph_sdl: supergraph_sdl.unwrap_or_default(),
supergraph_schema_id: supergraph_schema_id.unwrap_or_default(),
supergraph_schema: supergraph_schema
.unwrap_or_else(|| Arc::new(Valid::assume_valid(Schema::new()))),
subgraph_schemas: subgraph_schemas.unwrap_or_default(),
launch_id,
notify: notify.unwrap_or_else(Notify::for_tests),
}
}
}
impl PluginInit<serde_json::Value> {
pub fn with_deserialized_config<T>(self) -> Result<PluginInit<T>, BoxError>
where
T: for<'de> Deserialize<'de>,
{
PluginInit::try_builder()
.config(self.config)
.supergraph_schema(self.supergraph_schema)
.supergraph_schema_id(self.supergraph_schema_id)
.supergraph_sdl(self.supergraph_sdl)
.subgraph_schemas(self.subgraph_schemas)
.notify(self.notify.clone())
.build()
}
}
#[derive(Clone)]
pub struct PluginFactory {
pub(crate) name: String,
pub(crate) hidden_from_config_json_schema: bool,
instance_factory: InstanceFactory,
schema_factory: SchemaFactory,
pub(crate) type_id: TypeId,
}
impl fmt::Debug for PluginFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PluginFactory")
.field("name", &self.name)
.field("type_id", &self.type_id)
.finish()
}
}
impl PluginFactory {
pub(crate) fn is_apollo(&self) -> bool {
self.name.starts_with("apollo.") || self.name.starts_with("experimental.")
}
pub fn new<P: PluginUnstable>(group: &str, name: &str) -> PluginFactory {
let plugin_factory_name = if group.is_empty() {
name.to_string()
} else {
format!("{group}.{name}")
};
tracing::debug!(%plugin_factory_name, "creating plugin factory");
PluginFactory {
name: plugin_factory_name,
hidden_from_config_json_schema: false,
instance_factory: |init| {
Box::pin(async move {
let init = init.with_deserialized_config()?;
let plugin = P::new(init).await?;
Ok(Box::new(plugin) as Box<dyn DynPlugin>)
})
},
schema_factory: |gen| gen.subschema_for::<<P as PluginUnstable>::Config>(),
type_id: TypeId::of::<P>(),
}
}
pub(crate) fn new_private<P: PluginPrivate>(group: &str, name: &str) -> PluginFactory {
let plugin_factory_name = if group.is_empty() {
name.to_string()
} else {
format!("{group}.{name}")
};
tracing::debug!(%plugin_factory_name, "creating plugin factory");
PluginFactory {
name: plugin_factory_name,
hidden_from_config_json_schema: P::HIDDEN_FROM_CONFIG_JSON_SCHEMA,
instance_factory: |init| {
Box::pin(async move {
let init = init.with_deserialized_config()?;
let plugin = P::new(init).await?;
Ok(Box::new(plugin) as Box<dyn DynPlugin>)
})
},
schema_factory: |gen| gen.subschema_for::<<P as PluginPrivate>::Config>(),
type_id: TypeId::of::<P>(),
}
}
pub(crate) async fn create_instance(
&self,
init: PluginInit<serde_json::Value>,
) -> Result<Box<dyn DynPlugin>, BoxError> {
(self.instance_factory)(init).await
}
#[cfg(test)]
pub(crate) async fn create_instance_without_schema(
&self,
configuration: &serde_json::Value,
) -> Result<Box<dyn DynPlugin>, BoxError> {
(self.instance_factory)(
PluginInit::fake_builder()
.config(configuration.clone())
.build(),
)
.await
}
pub(crate) fn create_schema(&self, gen: &mut SchemaGenerator) -> schemars::schema::Schema {
(self.schema_factory)(gen)
}
}
pub(crate) fn plugins() -> impl Iterator<Item = &'static Lazy<PluginFactory>> {
PLUGINS.iter()
}
#[async_trait]
pub trait Plugin: Send + Sync + 'static {
type Config: JsonSchema + DeserializeOwned + Send;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized;
fn router_service(&self, service: router::BoxService) -> router::BoxService {
service
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
service
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
service
}
fn subgraph_service(
&self,
_subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
service
}
fn name(&self) -> &'static str
where
Self: Sized,
{
get_type_of(self)
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
MultiMap::new()
}
}
#[async_trait]
pub trait PluginUnstable: Send + Sync + 'static {
type Config: JsonSchema + DeserializeOwned + Send;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized;
fn router_service(&self, service: router::BoxService) -> router::BoxService {
service
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
service
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
service
}
fn subgraph_service(
&self,
_subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
service
}
fn name(&self) -> &'static str
where
Self: Sized,
{
get_type_of(self)
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
MultiMap::new()
}
fn unstable_method(&self);
}
#[async_trait]
impl<P> PluginUnstable for P
where
P: Plugin,
{
type Config = <P as Plugin>::Config;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized,
{
Plugin::new(init).await
}
fn router_service(&self, service: router::BoxService) -> router::BoxService {
Plugin::router_service(self, service)
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
Plugin::supergraph_service(self, service)
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
Plugin::execution_service(self, service)
}
fn subgraph_service(
&self,
subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
Plugin::subgraph_service(self, subgraph_name, service)
}
fn name(&self) -> &'static str
where
Self: Sized,
{
Plugin::name(self)
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
Plugin::web_endpoints(self)
}
fn unstable_method(&self) {
todo!()
}
}
#[async_trait]
pub(crate) trait PluginPrivate: Send + Sync + 'static {
type Config: JsonSchema + DeserializeOwned + Send;
const HIDDEN_FROM_CONFIG_JSON_SCHEMA: bool = false;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized;
fn router_service(&self, service: router::BoxService) -> router::BoxService {
service
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
service
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
service
}
fn subgraph_service(
&self,
_subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
service
}
fn http_client_service(
&self,
_subgraph_name: &str,
service: crate::services::http::BoxService,
) -> crate::services::http::BoxService {
service
}
fn name(&self) -> &'static str
where
Self: Sized,
{
get_type_of(self)
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
MultiMap::new()
}
fn activate(&self) {}
}
#[async_trait]
impl<P> PluginPrivate for P
where
P: PluginUnstable,
{
type Config = <P as PluginUnstable>::Config;
async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
where
Self: Sized,
{
PluginUnstable::new(init).await
}
fn router_service(&self, service: router::BoxService) -> router::BoxService {
PluginUnstable::router_service(self, service)
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
PluginUnstable::supergraph_service(self, service)
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
PluginUnstable::execution_service(self, service)
}
fn subgraph_service(
&self,
subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService {
PluginUnstable::subgraph_service(self, subgraph_name, service)
}
fn name(&self) -> &'static str
where
Self: Sized,
{
PluginUnstable::name(self)
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
PluginUnstable::web_endpoints(self)
}
fn activate(&self) {}
}
fn get_type_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}
#[async_trait]
pub(crate) trait DynPlugin: Send + Sync + 'static {
fn router_service(&self, service: router::BoxService) -> router::BoxService;
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService;
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService;
fn subgraph_service(
&self,
_subgraph_name: &str,
service: subgraph::BoxService,
) -> subgraph::BoxService;
fn http_client_service(
&self,
_subgraph_name: &str,
service: crate::services::http::BoxService,
) -> crate::services::http::BoxService;
fn name(&self) -> &'static str;
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint>;
fn as_any(&self) -> &dyn std::any::Any;
#[cfg(test)]
fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
fn activate(&self) {}
}
#[async_trait]
impl<T> DynPlugin for T
where
T: PluginPrivate,
for<'de> <T as PluginPrivate>::Config: Deserialize<'de>,
{
fn router_service(&self, service: router::BoxService) -> router::BoxService {
self.router_service(service)
}
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
self.supergraph_service(service)
}
fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
self.execution_service(service)
}
fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
self.subgraph_service(name, service)
}
fn http_client_service(
&self,
name: &str,
service: crate::services::http::BoxService,
) -> crate::services::http::BoxService {
self.http_client_service(name, service)
}
fn name(&self) -> &'static str {
self.name()
}
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
self.web_endpoints()
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[cfg(test)]
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
fn activate(&self) {
self.activate()
}
}
impl<T> From<T> for Box<dyn DynPlugin>
where
T: PluginPrivate,
{
fn from(value: T) -> Self {
Box::new(value)
}
}
#[macro_export]
macro_rules! register_plugin {
($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
const _: () = {
use $crate::_private::PLUGINS;
use $crate::_private::PluginFactory;
use $crate::_private::once_cell::sync::Lazy;
#[$crate::_private::linkme::distributed_slice(PLUGINS)]
#[linkme(crate = $crate::_private::linkme)]
static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
$crate::plugin::PluginFactory::new::<$plugin_type<$generic>>($group, $name)
});
};
};
($group: literal, $name: literal, $plugin_type: ident) => {
const _: () = {
use $crate::_private::PLUGINS;
use $crate::_private::PluginFactory;
use $crate::_private::once_cell::sync::Lazy;
#[$crate::_private::linkme::distributed_slice(PLUGINS)]
#[linkme(crate = $crate::_private::linkme)]
static REGISTER_PLUGIN: Lazy<PluginFactory> =
Lazy::new(|| $crate::plugin::PluginFactory::new::<$plugin_type>($group, $name));
};
};
}
#[macro_export]
macro_rules! register_private_plugin {
($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
const _: () = {
use $crate::_private::PLUGINS;
use $crate::_private::PluginFactory;
use $crate::_private::once_cell::sync::Lazy;
#[$crate::_private::linkme::distributed_slice(PLUGINS)]
#[linkme(crate = $crate::_private::linkme)]
static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
$crate::plugin::PluginFactory::new_private::<$plugin_type<$generic>>($group, $name)
});
};
};
($group: literal, $name: literal, $plugin_type: ident) => {
const _: () = {
use $crate::_private::PLUGINS;
use $crate::_private::PluginFactory;
use $crate::_private::once_cell::sync::Lazy;
#[$crate::_private::linkme::distributed_slice(PLUGINS)]
#[linkme(crate = $crate::_private::linkme)]
static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
$crate::plugin::PluginFactory::new_private::<$plugin_type>($group, $name)
});
};
};
}
#[derive(Clone)]
pub(crate) struct Handler {
service: Buffer<router::BoxService, router::Request>,
}
impl Handler {
pub(crate) fn new(service: router::BoxService) -> Self {
Self {
service: ServiceBuilder::new().buffered().service(service),
}
}
}
impl Service<router::Request> for Handler {
type Response = router::Response;
type Error = BoxError;
type Future = ResponseFuture<BoxFuture<'static, Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: router::Request) -> Self::Future {
self.service.call(req)
}
}
impl From<router::BoxService> for Handler {
fn from(original: router::BoxService) -> Self {
Self::new(original)
}
}