1pub mod serde;
18#[macro_use]
19pub mod test;
20
21use std::any::TypeId;
22use std::collections::HashMap;
23use std::fmt;
24use std::path::PathBuf;
25use std::sync::Arc;
26use std::task::Context;
27use std::task::Poll;
28
29use ::serde::Deserialize;
30use ::serde::de::DeserializeOwned;
31use apollo_compiler::Schema;
32use apollo_compiler::validation::Valid;
33use async_trait::async_trait;
34use futures::future::BoxFuture;
35use multimap::MultiMap;
36use once_cell::sync::Lazy;
37use schemars::JsonSchema;
38use schemars::gen::SchemaGenerator;
39use tower::BoxError;
40use tower::Service;
41use tower::ServiceBuilder;
42use tower::buffer::Buffer;
43use tower::buffer::future::ResponseFuture;
44
45use crate::ListenAddr;
46use crate::graphql;
47use crate::layers::ServiceBuilderExt;
48use crate::notification::Notify;
49use crate::router_factory::Endpoint;
50use crate::services::execution;
51use crate::services::router;
52use crate::services::subgraph;
53use crate::services::supergraph;
54
55type InstanceFactory =
56 fn(PluginInit<serde_json::Value>) -> BoxFuture<'static, Result<Box<dyn DynPlugin>, BoxError>>;
57
58type SchemaFactory = fn(&mut SchemaGenerator) -> schemars::schema::Schema;
59
60#[linkme::distributed_slice]
62pub static PLUGINS: [Lazy<PluginFactory>] = [..];
63
64#[non_exhaustive]
66pub struct PluginInit<T> {
67 pub config: T,
69 pub supergraph_sdl: Arc<String>,
71 pub(crate) supergraph_schema_id: Arc<String>,
73 pub(crate) supergraph_schema: Arc<Valid<Schema>>,
75
76 pub(crate) subgraph_schemas: Arc<HashMap<String, Arc<Valid<Schema>>>>,
78
79 pub(crate) launch_id: Option<Arc<String>>,
81
82 pub(crate) notify: Notify<String, graphql::Response>,
83}
84
85impl<T> PluginInit<T>
86where
87 T: for<'de> Deserialize<'de>,
88{
89 #[deprecated = "use PluginInit::builder() instead"]
90 pub fn new(config: T, supergraph_sdl: Arc<String>) -> Self {
92 Self::builder()
93 .config(config)
94 .supergraph_schema(Arc::new(
95 Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
96 .expect("failed to parse supergraph schema"),
97 ))
98 .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
99 .supergraph_sdl(supergraph_sdl)
100 .notify(Notify::builder().build())
101 .build()
102 }
103
104 #[deprecated = "use PluginInit::try_builder() instead"]
109 pub fn try_new(
110 config: serde_json::Value,
111 supergraph_sdl: Arc<String>,
112 ) -> Result<Self, BoxError> {
113 Self::try_builder()
114 .config(config)
115 .supergraph_schema(Arc::new(
116 Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
117 .map_err(|e| {
118 BoxError::from(e.errors.to_string())
120 })?,
121 ))
122 .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
123 .supergraph_sdl(supergraph_sdl)
124 .notify(Notify::builder().build())
125 .build()
126 }
127
128 #[cfg(test)]
129 pub(crate) fn fake_new(config: T, supergraph_sdl: Arc<String>) -> Self {
130 let supergraph_schema = Arc::new(if !supergraph_sdl.is_empty() {
131 Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
132 .expect("failed to parse supergraph schema")
133 } else {
134 Valid::assume_valid(Schema::new())
135 });
136
137 PluginInit::fake_builder()
138 .config(config)
139 .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
140 .supergraph_sdl(supergraph_sdl)
141 .supergraph_schema(supergraph_schema)
142 .launch_id(Arc::new("launch_id".to_string()))
143 .notify(Notify::for_tests())
144 .build()
145 }
146
147 #[doc(hidden)]
150 pub fn unsupported_supergraph_schema(&self) -> Arc<Valid<Schema>> {
151 self.supergraph_schema.clone()
152 }
153
154 #[doc(hidden)]
158 pub fn unsupported_subgraph_schemas(&self) -> Arc<HashMap<String, Arc<Valid<Schema>>>> {
159 self.subgraph_schemas.clone()
160 }
161}
162
163#[buildstructor::buildstructor]
164impl<T> PluginInit<T>
165where
166 T: for<'de> Deserialize<'de>,
167{
168 #[builder(entry = "builder", exit = "build", visibility = "pub")]
170 pub(crate) fn new_builder(
174 config: T,
175 supergraph_sdl: Arc<String>,
176 supergraph_schema_id: Arc<String>,
177 supergraph_schema: Arc<Valid<Schema>>,
178 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
179 launch_id: Option<Option<Arc<String>>>,
180 notify: Notify<String, graphql::Response>,
181 ) -> Self {
182 PluginInit {
183 config,
184 supergraph_sdl,
185 supergraph_schema_id,
186 supergraph_schema,
187 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
188 launch_id: launch_id.flatten(),
189 notify,
190 }
191 }
192
193 #[builder(entry = "try_builder", exit = "build", visibility = "pub")]
194 pub(crate) fn try_new_builder(
199 config: serde_json::Value,
200 supergraph_sdl: Arc<String>,
201 supergraph_schema_id: Arc<String>,
202 supergraph_schema: Arc<Valid<Schema>>,
203 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
204 launch_id: Option<Arc<String>>,
205 notify: Notify<String, graphql::Response>,
206 ) -> Result<Self, BoxError> {
207 let config: T = serde_json::from_value(config)?;
208 Ok(PluginInit {
209 config,
210 supergraph_sdl,
211 supergraph_schema,
212 supergraph_schema_id,
213 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
214 launch_id,
215 notify,
216 })
217 }
218
219 #[builder(entry = "fake_builder", exit = "build", visibility = "pub")]
221 fn fake_new_builder(
222 config: T,
223 supergraph_sdl: Option<Arc<String>>,
224 supergraph_schema_id: Option<Arc<String>>,
225 supergraph_schema: Option<Arc<Valid<Schema>>>,
226 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
227 launch_id: Option<Arc<String>>,
228 notify: Option<Notify<String, graphql::Response>>,
229 ) -> Self {
230 PluginInit {
231 config,
232 supergraph_sdl: supergraph_sdl.unwrap_or_default(),
233 supergraph_schema_id: supergraph_schema_id.unwrap_or_default(),
234 supergraph_schema: supergraph_schema
235 .unwrap_or_else(|| Arc::new(Valid::assume_valid(Schema::new()))),
236 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
237 launch_id,
238 notify: notify.unwrap_or_else(Notify::for_tests),
239 }
240 }
241}
242
243impl PluginInit<serde_json::Value> {
244 pub fn with_deserialized_config<T>(self) -> Result<PluginInit<T>, BoxError>
246 where
247 T: for<'de> Deserialize<'de>,
248 {
249 PluginInit::try_builder()
250 .config(self.config)
251 .supergraph_schema(self.supergraph_schema)
252 .supergraph_schema_id(self.supergraph_schema_id)
253 .supergraph_sdl(self.supergraph_sdl)
254 .subgraph_schemas(self.subgraph_schemas)
255 .notify(self.notify.clone())
256 .build()
257 }
258}
259
260#[derive(Clone)]
262pub struct PluginFactory {
263 pub(crate) name: String,
264 pub(crate) hidden_from_config_json_schema: bool,
265 instance_factory: InstanceFactory,
266 schema_factory: SchemaFactory,
267 pub(crate) type_id: TypeId,
268}
269
270impl fmt::Debug for PluginFactory {
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 f.debug_struct("PluginFactory")
273 .field("name", &self.name)
274 .field("type_id", &self.type_id)
275 .finish()
276 }
277}
278
279impl PluginFactory {
280 pub(crate) fn is_apollo(&self) -> bool {
281 self.name.starts_with("apollo.") || self.name.starts_with("experimental.")
282 }
283
284 pub fn new<P: PluginUnstable>(group: &str, name: &str) -> PluginFactory {
286 let plugin_factory_name = if group.is_empty() {
287 name.to_string()
288 } else {
289 format!("{group}.{name}")
290 };
291 tracing::debug!(%plugin_factory_name, "creating plugin factory");
292 PluginFactory {
293 name: plugin_factory_name,
294 hidden_from_config_json_schema: false,
295 instance_factory: |init| {
296 Box::pin(async move {
297 let init = init.with_deserialized_config()?;
298 let plugin = P::new(init).await?;
299 Ok(Box::new(plugin) as Box<dyn DynPlugin>)
300 })
301 },
302 schema_factory: |gen| gen.subschema_for::<<P as PluginUnstable>::Config>(),
303 type_id: TypeId::of::<P>(),
304 }
305 }
306
307 pub(crate) fn new_private<P: PluginPrivate>(group: &str, name: &str) -> PluginFactory {
309 let plugin_factory_name = if group.is_empty() {
310 name.to_string()
311 } else {
312 format!("{group}.{name}")
313 };
314 tracing::debug!(%plugin_factory_name, "creating plugin factory");
315 PluginFactory {
316 name: plugin_factory_name,
317 hidden_from_config_json_schema: P::HIDDEN_FROM_CONFIG_JSON_SCHEMA,
318 instance_factory: |init| {
319 Box::pin(async move {
320 let init = init.with_deserialized_config()?;
321 let plugin = P::new(init).await?;
322 Ok(Box::new(plugin) as Box<dyn DynPlugin>)
323 })
324 },
325 schema_factory: |gen| gen.subschema_for::<<P as PluginPrivate>::Config>(),
326 type_id: TypeId::of::<P>(),
327 }
328 }
329
330 pub(crate) async fn create_instance(
331 &self,
332 init: PluginInit<serde_json::Value>,
333 ) -> Result<Box<dyn DynPlugin>, BoxError> {
334 (self.instance_factory)(init).await
335 }
336
337 #[cfg(test)]
338 pub(crate) async fn create_instance_without_schema(
339 &self,
340 configuration: &serde_json::Value,
341 ) -> Result<Box<dyn DynPlugin>, BoxError> {
342 (self.instance_factory)(
343 PluginInit::fake_builder()
344 .config(configuration.clone())
345 .build(),
346 )
347 .await
348 }
349
350 pub(crate) fn create_schema(&self, gen: &mut SchemaGenerator) -> schemars::schema::Schema {
351 (self.schema_factory)(gen)
352 }
353}
354
355pub(crate) fn plugins() -> impl Iterator<Item = &'static Lazy<PluginFactory>> {
358 PLUGINS.iter()
359}
360
361#[async_trait]
367pub trait Plugin: Send + Sync + 'static {
368 type Config: JsonSchema + DeserializeOwned + Send;
377
378 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
381 where
382 Self: Sized;
383
384 fn router_service(&self, service: router::BoxService) -> router::BoxService {
391 service
392 }
393
394 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
398 service
399 }
400
401 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
404 service
405 }
406
407 fn subgraph_service(
411 &self,
412 _subgraph_name: &str,
413 service: subgraph::BoxService,
414 ) -> subgraph::BoxService {
415 service
416 }
417
418 fn name(&self) -> &'static str
420 where
421 Self: Sized,
422 {
423 get_type_of(self)
424 }
425
426 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
430 MultiMap::new()
431 }
432}
433
434#[async_trait]
441pub trait PluginUnstable: Send + Sync + 'static {
442 type Config: JsonSchema + DeserializeOwned + Send;
451
452 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
455 where
456 Self: Sized;
457
458 fn router_service(&self, service: router::BoxService) -> router::BoxService {
465 service
466 }
467
468 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
472 service
473 }
474
475 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
478 service
479 }
480
481 fn subgraph_service(
485 &self,
486 _subgraph_name: &str,
487 service: subgraph::BoxService,
488 ) -> subgraph::BoxService {
489 service
490 }
491
492 fn name(&self) -> &'static str
494 where
495 Self: Sized,
496 {
497 get_type_of(self)
498 }
499
500 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
504 MultiMap::new()
505 }
506
507 fn unstable_method(&self);
509}
510
511#[async_trait]
512impl<P> PluginUnstable for P
513where
514 P: Plugin,
515{
516 type Config = <P as Plugin>::Config;
517
518 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
519 where
520 Self: Sized,
521 {
522 Plugin::new(init).await
523 }
524
525 fn router_service(&self, service: router::BoxService) -> router::BoxService {
526 Plugin::router_service(self, service)
527 }
528
529 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
530 Plugin::supergraph_service(self, service)
531 }
532
533 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
534 Plugin::execution_service(self, service)
535 }
536
537 fn subgraph_service(
538 &self,
539 subgraph_name: &str,
540 service: subgraph::BoxService,
541 ) -> subgraph::BoxService {
542 Plugin::subgraph_service(self, subgraph_name, service)
543 }
544
545 fn name(&self) -> &'static str
547 where
548 Self: Sized,
549 {
550 Plugin::name(self)
551 }
552
553 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
554 Plugin::web_endpoints(self)
555 }
556
557 fn unstable_method(&self) {
558 todo!()
559 }
560}
561
562#[async_trait]
570pub(crate) trait PluginPrivate: Send + Sync + 'static {
571 type Config: JsonSchema + DeserializeOwned + Send;
580
581 const HIDDEN_FROM_CONFIG_JSON_SCHEMA: bool = false;
582
583 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
586 where
587 Self: Sized;
588
589 fn router_service(&self, service: router::BoxService) -> router::BoxService {
596 service
597 }
598
599 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
603 service
604 }
605
606 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
609 service
610 }
611
612 fn subgraph_service(
616 &self,
617 _subgraph_name: &str,
618 service: subgraph::BoxService,
619 ) -> subgraph::BoxService {
620 service
621 }
622
623 fn http_client_service(
625 &self,
626 _subgraph_name: &str,
627 service: crate::services::http::BoxService,
628 ) -> crate::services::http::BoxService {
629 service
630 }
631
632 fn name(&self) -> &'static str
634 where
635 Self: Sized,
636 {
637 get_type_of(self)
638 }
639
640 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
644 MultiMap::new()
645 }
646
647 fn activate(&self) {}
649}
650
651#[async_trait]
652impl<P> PluginPrivate for P
653where
654 P: PluginUnstable,
655{
656 type Config = <P as PluginUnstable>::Config;
657
658 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
659 where
660 Self: Sized,
661 {
662 PluginUnstable::new(init).await
663 }
664
665 fn router_service(&self, service: router::BoxService) -> router::BoxService {
666 PluginUnstable::router_service(self, service)
667 }
668
669 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
670 PluginUnstable::supergraph_service(self, service)
671 }
672
673 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
674 PluginUnstable::execution_service(self, service)
675 }
676
677 fn subgraph_service(
678 &self,
679 subgraph_name: &str,
680 service: subgraph::BoxService,
681 ) -> subgraph::BoxService {
682 PluginUnstable::subgraph_service(self, subgraph_name, service)
683 }
684
685 fn name(&self) -> &'static str
687 where
688 Self: Sized,
689 {
690 PluginUnstable::name(self)
691 }
692
693 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
694 PluginUnstable::web_endpoints(self)
695 }
696
697 fn activate(&self) {}
698}
699
700fn get_type_of<T>(_: &T) -> &'static str {
701 std::any::type_name::<T>()
702}
703
704#[async_trait]
710pub(crate) trait DynPlugin: Send + Sync + 'static {
711 fn router_service(&self, service: router::BoxService) -> router::BoxService;
716
717 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService;
721
722 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService;
725
726 fn subgraph_service(
730 &self,
731 _subgraph_name: &str,
732 service: subgraph::BoxService,
733 ) -> subgraph::BoxService;
734
735 fn http_client_service(
737 &self,
738 _subgraph_name: &str,
739 service: crate::services::http::BoxService,
740 ) -> crate::services::http::BoxService;
741
742 fn name(&self) -> &'static str;
744
745 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint>;
747
748 fn as_any(&self) -> &dyn std::any::Any;
750
751 #[cfg(test)]
753 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
754
755 fn activate(&self) {}
757}
758
759#[async_trait]
760impl<T> DynPlugin for T
761where
762 T: PluginPrivate,
763 for<'de> <T as PluginPrivate>::Config: Deserialize<'de>,
764{
765 fn router_service(&self, service: router::BoxService) -> router::BoxService {
766 self.router_service(service)
767 }
768
769 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
770 self.supergraph_service(service)
771 }
772
773 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
774 self.execution_service(service)
775 }
776
777 fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
778 self.subgraph_service(name, service)
779 }
780
781 fn http_client_service(
783 &self,
784 name: &str,
785 service: crate::services::http::BoxService,
786 ) -> crate::services::http::BoxService {
787 self.http_client_service(name, service)
788 }
789
790 fn name(&self) -> &'static str {
791 self.name()
792 }
793
794 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
796 self.web_endpoints()
797 }
798
799 fn as_any(&self) -> &dyn std::any::Any {
800 self
801 }
802
803 #[cfg(test)]
804 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
805 self
806 }
807
808 fn activate(&self) {
809 self.activate()
810 }
811}
812
813impl<T> From<T> for Box<dyn DynPlugin>
814where
815 T: PluginPrivate,
816{
817 fn from(value: T) -> Self {
818 Box::new(value)
819 }
820}
821
822#[macro_export]
826macro_rules! register_plugin {
827 ($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
828 const _: () = {
830 use $crate::_private::PLUGINS;
831 use $crate::_private::PluginFactory;
832 use $crate::_private::once_cell::sync::Lazy;
833
834 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
835 #[linkme(crate = $crate::_private::linkme)]
836 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
837 $crate::plugin::PluginFactory::new::<$plugin_type<$generic>>($group, $name)
838 });
839 };
840 };
841
842 ($group: literal, $name: literal, $plugin_type: ident) => {
843 const _: () = {
845 use $crate::_private::PLUGINS;
846 use $crate::_private::PluginFactory;
847 use $crate::_private::once_cell::sync::Lazy;
848
849 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
850 #[linkme(crate = $crate::_private::linkme)]
851 static REGISTER_PLUGIN: Lazy<PluginFactory> =
852 Lazy::new(|| $crate::plugin::PluginFactory::new::<$plugin_type>($group, $name));
853 };
854 };
855}
856
857#[macro_export]
861macro_rules! register_private_plugin {
862 ($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
863 const _: () = {
865 use $crate::_private::PLUGINS;
866 use $crate::_private::PluginFactory;
867 use $crate::_private::once_cell::sync::Lazy;
868
869 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
870 #[linkme(crate = $crate::_private::linkme)]
871 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
872 $crate::plugin::PluginFactory::new_private::<$plugin_type<$generic>>($group, $name)
873 });
874 };
875 };
876
877 ($group: literal, $name: literal, $plugin_type: ident) => {
878 const _: () = {
880 use $crate::_private::PLUGINS;
881 use $crate::_private::PluginFactory;
882 use $crate::_private::once_cell::sync::Lazy;
883
884 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
885 #[linkme(crate = $crate::_private::linkme)]
886 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
887 $crate::plugin::PluginFactory::new_private::<$plugin_type>($group, $name)
888 });
889 };
890 };
891}
892
893#[derive(Clone)]
895pub(crate) struct Handler {
896 service: Buffer<router::BoxService, router::Request>,
897}
898
899impl Handler {
900 pub(crate) fn new(service: router::BoxService) -> Self {
901 Self {
902 service: ServiceBuilder::new().buffered().service(service),
903 }
904 }
905}
906
907impl Service<router::Request> for Handler {
908 type Response = router::Response;
909 type Error = BoxError;
910 type Future = ResponseFuture<BoxFuture<'static, Result<Self::Response, Self::Error>>>;
911
912 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
913 self.service.poll_ready(cx)
914 }
915
916 fn call(&mut self, req: router::Request) -> Self::Future {
917 self.service.call(req)
918 }
919}
920
921impl From<router::BoxService> for Handler {
922 fn from(original: router::BoxService) -> Self {
923 Self::new(original)
924 }
925}