1pub mod serde;
18#[macro_use]
19pub mod test;
20
21use std::any::TypeId;
22use std::collections::HashMap;
23use std::fmt;
24#[cfg(test)]
25use std::path::PathBuf;
26use std::sync::Arc;
27use std::task::Context;
28use std::task::Poll;
29
30use ::serde::Deserialize;
31use ::serde::de::DeserializeOwned;
32use apollo_compiler::Schema;
33use apollo_compiler::validation::Valid;
34use async_trait::async_trait;
35use futures::future::BoxFuture;
36use multimap::MultiMap;
37use once_cell::sync::Lazy;
38use schemars::JsonSchema;
39use schemars::SchemaGenerator;
40use serde_json::Value;
41use tower::BoxError;
42use tower::Service;
43use tower::ServiceBuilder;
44use tower::buffer::Buffer;
45use tower::buffer::future::ResponseFuture;
46
47use crate::ListenAddr;
48use crate::graphql;
49use crate::layers::ServiceBuilderExt;
50use crate::plugins::subscription::notification::Notify;
51use crate::router_factory::Endpoint;
52use crate::services::execution;
53use crate::services::router;
54use crate::services::subgraph;
55use crate::services::supergraph;
56use crate::uplink::license_enforcement::LicenseState;
57
58type InstanceFactory =
59 fn(PluginInit<serde_json::Value>) -> BoxFuture<'static, Result<Box<dyn DynPlugin>, BoxError>>;
60
61type SchemaFactory = fn(&mut SchemaGenerator) -> schemars::Schema;
62
63#[linkme::distributed_slice]
65pub static PLUGINS: [Lazy<PluginFactory>] = [..];
66
67#[non_exhaustive]
69pub struct PluginInit<T> {
70 pub config: T,
72 pub(crate) previous_config: Option<T>,
74 pub supergraph_sdl: Arc<String>,
76 pub(crate) supergraph_schema_id: Arc<String>,
78 pub(crate) supergraph_schema: Arc<Valid<Schema>>,
80
81 pub(crate) subgraph_schemas: Arc<HashMap<String, Arc<Valid<Schema>>>>,
83
84 pub(crate) launch_id: Option<Arc<String>>,
86
87 pub(crate) notify: Notify<String, graphql::Response>,
88
89 pub(crate) license: Arc<LicenseState>,
91
92 pub(crate) full_config: Option<Value>,
96
97 pub(crate) raw_yaml: Option<Arc<str>>,
99}
100
101impl<T> PluginInit<T>
102where
103 T: for<'de> Deserialize<'de>,
104{
105 #[cfg(test)]
106 pub(crate) fn fake_new(config: T, supergraph_sdl: Arc<String>) -> Self {
107 let supergraph_schema = Arc::new(if !supergraph_sdl.is_empty() {
108 Schema::parse_and_validate(supergraph_sdl.to_string(), PathBuf::from("synthetic"))
109 .expect("failed to parse supergraph schema")
110 } else {
111 Valid::assume_valid(Schema::new())
112 });
113
114 PluginInit::fake_builder()
115 .config(config)
116 .supergraph_schema_id(crate::spec::Schema::schema_id(&supergraph_sdl).into_inner())
117 .supergraph_sdl(supergraph_sdl)
118 .supergraph_schema(supergraph_schema)
119 .launch_id(Arc::new("launch_id".to_string()))
120 .notify(Notify::for_tests())
121 .license(Arc::new(LicenseState::default()))
122 .build()
123 }
124}
125
126#[buildstructor::buildstructor]
127impl<T> PluginInit<T>
128where
129 T: for<'de> Deserialize<'de>,
130{
131 #[builder(entry = "builder", exit = "build", visibility = "pub")]
133 pub(crate) fn new_builder(
137 config: T,
138 previous_config: Option<T>,
139 supergraph_sdl: Arc<String>,
140 supergraph_schema_id: Arc<String>,
141 supergraph_schema: Arc<Valid<Schema>>,
142 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
143 launch_id: Option<Option<Arc<String>>>,
144 notify: Notify<String, graphql::Response>,
145 license: Arc<LicenseState>,
146 full_config: Option<Value>,
147 original_config_yaml: Option<Arc<str>>,
148 ) -> Self {
149 PluginInit {
150 config,
151 previous_config,
152 supergraph_sdl,
153 supergraph_schema_id,
154 supergraph_schema,
155 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
156 launch_id: launch_id.flatten(),
157 notify,
158 license,
159 full_config,
160 raw_yaml: original_config_yaml,
161 }
162 }
163
164 #[builder(entry = "try_builder", exit = "build", visibility = "pub")]
165 pub(crate) fn try_new_builder(
170 config: serde_json::Value,
171 previous_config: Option<serde_json::Value>,
172 supergraph_sdl: Arc<String>,
173 supergraph_schema_id: Arc<String>,
174 supergraph_schema: Arc<Valid<Schema>>,
175 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
176 launch_id: Option<Arc<String>>,
177 notify: Notify<String, graphql::Response>,
178 license: Arc<LicenseState>,
179 full_config: Option<Value>,
180 original_config_yaml: Option<Arc<str>>,
181 ) -> Result<Self, BoxError> {
182 let config: T = serde_json::from_value(config)?;
183 let previous_config = previous_config.map(serde_json::from_value).transpose()?;
184 Ok(PluginInit {
185 config,
186 previous_config,
187 supergraph_sdl,
188 supergraph_schema,
189 supergraph_schema_id,
190 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
191 launch_id,
192 notify,
193 license,
194 full_config,
195 raw_yaml: original_config_yaml,
196 })
197 }
198
199 #[builder(entry = "fake_builder", exit = "build", visibility = "pub")]
201 fn fake_new_builder(
202 config: T,
203 previous_config: Option<T>,
204 supergraph_sdl: Option<Arc<String>>,
205 supergraph_schema_id: Option<Arc<String>>,
206 supergraph_schema: Option<Arc<Valid<Schema>>>,
207 subgraph_schemas: Option<Arc<HashMap<String, Arc<Valid<Schema>>>>>,
208 launch_id: Option<Arc<String>>,
209 notify: Option<Notify<String, graphql::Response>>,
210 license: Option<Arc<LicenseState>>,
211 full_config: Option<Value>,
212 original_config_yaml: Option<Arc<str>>,
213 ) -> Self {
214 PluginInit {
215 config,
216 previous_config,
217 supergraph_sdl: supergraph_sdl.unwrap_or_default(),
218 supergraph_schema_id: supergraph_schema_id.unwrap_or_default(),
219 supergraph_schema: supergraph_schema
220 .unwrap_or_else(|| Arc::new(Valid::assume_valid(Schema::new()))),
221 subgraph_schemas: subgraph_schemas.unwrap_or_default(),
222 launch_id,
223 notify: notify.unwrap_or_else(Notify::for_tests),
224 license: license.unwrap_or_default(),
225 full_config,
226 raw_yaml: original_config_yaml,
227 }
228 }
229}
230
231impl PluginInit<serde_json::Value> {
232 pub fn with_deserialized_config<T>(self) -> Result<PluginInit<T>, BoxError>
234 where
235 T: for<'de> Deserialize<'de>,
236 {
237 PluginInit::try_builder()
238 .config(self.config)
239 .and_previous_config(self.previous_config)
240 .supergraph_schema(self.supergraph_schema)
241 .supergraph_schema_id(self.supergraph_schema_id)
242 .supergraph_sdl(self.supergraph_sdl)
243 .subgraph_schemas(self.subgraph_schemas)
244 .notify(self.notify.clone())
245 .license(self.license)
246 .and_full_config(self.full_config)
247 .and_original_config_yaml(self.raw_yaml)
248 .build()
249 }
250}
251
252#[derive(Clone)]
254pub struct PluginFactory {
255 pub(crate) name: String,
256 pub(crate) hidden_from_config_json_schema: bool,
257 instance_factory: InstanceFactory,
258 schema_factory: SchemaFactory,
259 pub(crate) type_id: TypeId,
260}
261
262impl fmt::Debug for PluginFactory {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 f.debug_struct("PluginFactory")
265 .field("name", &self.name)
266 .field("type_id", &self.type_id)
267 .finish()
268 }
269}
270
271impl PluginFactory {
272 pub(crate) fn is_apollo(&self) -> bool {
273 self.name.starts_with("apollo.") || self.name.starts_with("experimental.")
274 }
275
276 pub fn new<P: PluginUnstable>(group: &str, name: &str) -> PluginFactory {
278 let plugin_factory_name = if group.is_empty() {
279 name.to_string()
280 } else {
281 format!("{group}.{name}")
282 };
283 tracing::debug!(%plugin_factory_name, "creating plugin factory");
284 PluginFactory {
285 name: plugin_factory_name,
286 hidden_from_config_json_schema: false,
287 instance_factory: |init| {
288 Box::pin(async move {
289 let init = init.with_deserialized_config()?;
290 let plugin = P::new(init).await?;
291 Ok(Box::new(plugin) as Box<dyn DynPlugin>)
292 })
293 },
294 schema_factory: |generator| generator.subschema_for::<<P as PluginUnstable>::Config>(),
295 type_id: TypeId::of::<P>(),
296 }
297 }
298
299 pub(crate) fn new_private<P: PluginPrivate>(group: &str, name: &str) -> PluginFactory {
301 let plugin_factory_name = if group.is_empty() {
302 name.to_string()
303 } else {
304 format!("{group}.{name}")
305 };
306 tracing::debug!(%plugin_factory_name, "creating plugin factory");
307 PluginFactory {
308 name: plugin_factory_name,
309 hidden_from_config_json_schema: P::HIDDEN_FROM_CONFIG_JSON_SCHEMA,
310 instance_factory: |init| {
311 Box::pin(async move {
312 let init = init.with_deserialized_config()?;
313 let plugin = P::new(init).await?;
314 Ok(Box::new(plugin) as Box<dyn DynPlugin>)
315 })
316 },
317 schema_factory: |generator| generator.subschema_for::<<P as PluginPrivate>::Config>(),
318 type_id: TypeId::of::<P>(),
319 }
320 }
321
322 pub(crate) async fn create_instance(
323 &self,
324 init: PluginInit<serde_json::Value>,
325 ) -> Result<Box<dyn DynPlugin>, BoxError> {
326 (self.instance_factory)(init).await
327 }
328
329 #[cfg(test)]
330 pub(crate) async fn create_instance_without_schema(
331 &self,
332 configuration: &serde_json::Value,
333 ) -> Result<Box<dyn DynPlugin>, BoxError> {
334 (self.instance_factory)(
335 PluginInit::fake_builder()
336 .config(configuration.clone())
337 .build(),
338 )
339 .await
340 }
341
342 pub(crate) fn create_schema(&self, generator: &mut SchemaGenerator) -> schemars::Schema {
343 (self.schema_factory)(generator)
344 }
345}
346
347pub(crate) fn plugins() -> impl Iterator<Item = &'static Lazy<PluginFactory>> {
350 PLUGINS.iter()
351}
352
353#[async_trait]
359pub trait Plugin: Send + Sync + 'static {
360 type Config: JsonSchema + DeserializeOwned + Send;
369
370 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
373 where
374 Self: Sized;
375
376 fn router_service(&self, service: router::BoxService) -> router::BoxService {
383 service
384 }
385
386 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
390 service
391 }
392
393 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
396 service
397 }
398
399 fn subgraph_service(
403 &self,
404 _subgraph_name: &str,
405 service: subgraph::BoxService,
406 ) -> subgraph::BoxService {
407 service
408 }
409
410 fn name(&self) -> &'static str
412 where
413 Self: Sized,
414 {
415 get_type_of(self)
416 }
417
418 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
422 MultiMap::new()
423 }
424}
425
426#[async_trait]
433pub trait PluginUnstable: Send + Sync + 'static {
434 type Config: JsonSchema + DeserializeOwned + Send;
443
444 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
447 where
448 Self: Sized;
449
450 fn router_service(&self, service: router::BoxService) -> router::BoxService {
457 service
458 }
459
460 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
464 service
465 }
466
467 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
470 service
471 }
472
473 fn subgraph_service(
477 &self,
478 _subgraph_name: &str,
479 service: subgraph::BoxService,
480 ) -> subgraph::BoxService {
481 service
482 }
483
484 fn name(&self) -> &'static str
486 where
487 Self: Sized,
488 {
489 get_type_of(self)
490 }
491
492 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
496 MultiMap::new()
497 }
498
499 fn unstable_method(&self);
501}
502
503#[async_trait]
504impl<P> PluginUnstable for P
505where
506 P: Plugin,
507{
508 type Config = <P as Plugin>::Config;
509
510 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
511 where
512 Self: Sized,
513 {
514 Plugin::new(init).await
515 }
516
517 fn router_service(&self, service: router::BoxService) -> router::BoxService {
518 Plugin::router_service(self, service)
519 }
520
521 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
522 Plugin::supergraph_service(self, service)
523 }
524
525 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
526 Plugin::execution_service(self, service)
527 }
528
529 fn subgraph_service(
530 &self,
531 subgraph_name: &str,
532 service: subgraph::BoxService,
533 ) -> subgraph::BoxService {
534 Plugin::subgraph_service(self, subgraph_name, service)
535 }
536
537 fn name(&self) -> &'static str
539 where
540 Self: Sized,
541 {
542 Plugin::name(self)
543 }
544
545 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
546 Plugin::web_endpoints(self)
547 }
548
549 fn unstable_method(&self) {
550 todo!()
551 }
552}
553
554#[async_trait]
562pub(crate) trait PluginPrivate: Send + Sync + 'static {
563 type Config: JsonSchema + DeserializeOwned + Send;
572
573 const HIDDEN_FROM_CONFIG_JSON_SCHEMA: bool = false;
574
575 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
578 where
579 Self: Sized;
580
581 fn router_service(&self, service: router::BoxService) -> router::BoxService {
588 service
589 }
590
591 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
595 service
596 }
597
598 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
601 service
602 }
603
604 fn subgraph_service(
608 &self,
609 _subgraph_name: &str,
610 service: subgraph::BoxService,
611 ) -> subgraph::BoxService {
612 service
613 }
614
615 fn http_client_service(
617 &self,
618 _subgraph_name: &str,
619 service: crate::services::http::BoxService,
620 ) -> crate::services::http::BoxService {
621 service
622 }
623
624 fn connector_request_service(
626 &self,
627 service: crate::services::connector::request_service::BoxService,
628 _source_name: String,
629 ) -> crate::services::connector::request_service::BoxService {
630 service
631 }
632
633 fn name(&self) -> &'static str
635 where
636 Self: Sized,
637 {
638 get_type_of(self)
639 }
640
641 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
645 MultiMap::new()
646 }
647
648 fn activate(&self) {}
650}
651
652#[async_trait]
653impl<P> PluginPrivate for P
654where
655 P: PluginUnstable,
656{
657 type Config = <P as PluginUnstable>::Config;
658
659 async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
660 where
661 Self: Sized,
662 {
663 PluginUnstable::new(init).await
664 }
665
666 fn router_service(&self, service: router::BoxService) -> router::BoxService {
667 PluginUnstable::router_service(self, service)
668 }
669
670 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
671 PluginUnstable::supergraph_service(self, service)
672 }
673
674 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
675 PluginUnstable::execution_service(self, service)
676 }
677
678 fn subgraph_service(
679 &self,
680 subgraph_name: &str,
681 service: subgraph::BoxService,
682 ) -> subgraph::BoxService {
683 PluginUnstable::subgraph_service(self, subgraph_name, service)
684 }
685
686 fn name(&self) -> &'static str
688 where
689 Self: Sized,
690 {
691 PluginUnstable::name(self)
692 }
693
694 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
695 PluginUnstable::web_endpoints(self)
696 }
697
698 fn activate(&self) {}
699}
700
701fn get_type_of<T>(_: &T) -> &'static str {
702 std::any::type_name::<T>()
703}
704
705#[async_trait]
711pub(crate) trait DynPlugin: Send + Sync + 'static {
712 fn router_service(&self, service: router::BoxService) -> router::BoxService;
717
718 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService;
722
723 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService;
726
727 fn subgraph_service(
731 &self,
732 _subgraph_name: &str,
733 service: subgraph::BoxService,
734 ) -> subgraph::BoxService;
735
736 fn http_client_service(
738 &self,
739 _subgraph_name: &str,
740 service: crate::services::http::BoxService,
741 ) -> crate::services::http::BoxService;
742
743 fn connector_request_service(
745 &self,
746 service: crate::services::connector::request_service::BoxService,
747 source_name: String,
748 ) -> crate::services::connector::request_service::BoxService;
749
750 fn name(&self) -> &'static str;
752
753 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint>;
755
756 fn as_any(&self) -> &dyn std::any::Any;
758
759 #[cfg(test)]
761 #[allow(dead_code)]
762 fn as_any_mut(&mut self) -> &mut dyn std::any::Any;
763
764 fn activate(&self) {}
766}
767
768#[async_trait]
769impl<T> DynPlugin for T
770where
771 T: PluginPrivate,
772 for<'de> <T as PluginPrivate>::Config: Deserialize<'de>,
773{
774 fn router_service(&self, service: router::BoxService) -> router::BoxService {
775 self.router_service(service)
776 }
777
778 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
779 self.supergraph_service(service)
780 }
781
782 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
783 self.execution_service(service)
784 }
785
786 fn subgraph_service(&self, name: &str, service: subgraph::BoxService) -> subgraph::BoxService {
787 self.subgraph_service(name, service)
788 }
789
790 fn http_client_service(
792 &self,
793 name: &str,
794 service: crate::services::http::BoxService,
795 ) -> crate::services::http::BoxService {
796 self.http_client_service(name, service)
797 }
798
799 fn connector_request_service(
800 &self,
801 service: crate::services::connector::request_service::BoxService,
802 source_name: String,
803 ) -> crate::services::connector::request_service::BoxService {
804 self.connector_request_service(service, source_name)
805 }
806
807 fn name(&self) -> &'static str {
808 self.name()
809 }
810
811 fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
813 self.web_endpoints()
814 }
815
816 fn as_any(&self) -> &dyn std::any::Any {
817 self
818 }
819
820 #[cfg(test)]
821 fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
822 self
823 }
824
825 fn activate(&self) {
826 self.activate()
827 }
828}
829
830impl<T> From<T> for Box<dyn DynPlugin>
831where
832 T: PluginPrivate,
833{
834 fn from(value: T) -> Self {
835 Box::new(value)
836 }
837}
838
839#[macro_export]
843macro_rules! register_plugin {
844 ($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
845 const _: () = {
847 use $crate::_private::PLUGINS;
848 use $crate::_private::PluginFactory;
849 use $crate::_private::once_cell::sync::Lazy;
850
851 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
852 #[linkme(crate = $crate::_private::linkme)]
853 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
854 $crate::plugin::PluginFactory::new::<$plugin_type<$generic>>($group, $name)
855 });
856 };
857 };
858
859 ($group: literal, $name: expr, $plugin_type: ident) => {
860 const _: () = {
862 use $crate::_private::PLUGINS;
863 use $crate::_private::PluginFactory;
864 use $crate::_private::once_cell::sync::Lazy;
865
866 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
867 #[linkme(crate = $crate::_private::linkme)]
868 static REGISTER_PLUGIN: Lazy<PluginFactory> =
869 Lazy::new(|| $crate::plugin::PluginFactory::new::<$plugin_type>($group, $name));
870 };
871 };
872}
873
874#[macro_export]
878macro_rules! register_private_plugin {
879 ($group: literal, $name: literal, $plugin_type: ident < $generic: ident >) => {
880 const _: () = {
882 use $crate::_private::PLUGINS;
883 use $crate::_private::PluginFactory;
884 use $crate::_private::once_cell::sync::Lazy;
885
886 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
887 #[linkme(crate = $crate::_private::linkme)]
888 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
889 $crate::plugin::PluginFactory::new_private::<$plugin_type<$generic>>($group, $name)
890 });
891 };
892 };
893
894 ($group: literal, $name: literal, $plugin_type: ident) => {
895 const _: () = {
897 use $crate::_private::PLUGINS;
898 use $crate::_private::PluginFactory;
899 use $crate::_private::once_cell::sync::Lazy;
900
901 #[$crate::_private::linkme::distributed_slice(PLUGINS)]
902 #[linkme(crate = $crate::_private::linkme)]
903 static REGISTER_PLUGIN: Lazy<PluginFactory> = Lazy::new(|| {
904 $crate::plugin::PluginFactory::new_private::<$plugin_type>($group, $name)
905 });
906 };
907 };
908}
909
910#[derive(Clone)]
912pub(crate) struct Handler {
913 service: Buffer<router::Request, <router::BoxService as Service<router::Request>>::Future>,
914}
915
916impl Handler {
917 pub(crate) fn new(service: router::BoxService) -> Self {
918 Self {
919 service: ServiceBuilder::new().buffered().service(service),
920 }
921 }
922}
923
924impl Service<router::Request> for Handler {
925 type Response = router::Response;
926 type Error = BoxError;
927 type Future = ResponseFuture<BoxFuture<'static, Result<Self::Response, Self::Error>>>;
928
929 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
930 self.service.poll_ready(cx)
931 }
932
933 fn call(&mut self, req: router::Request) -> Self::Future {
934 self.service.call(req)
935 }
936}
937
938impl From<router::BoxService> for Handler {
939 fn from(original: router::BoxService) -> Self {
940 Self::new(original)
941 }
942}