1use std::fmt;
3use std::hash::Hash;
4use std::io;
5use std::io::BufReader;
6use std::iter;
7use std::net::IpAddr;
8use std::net::SocketAddr;
9use std::num::NonZeroU32;
10use std::num::NonZeroUsize;
11use std::str::FromStr;
12use std::sync::Arc;
13use std::time::Duration;
14
15use connector::ConnectorConfiguration;
16use derivative::Derivative;
17use displaydoc::Display;
18use itertools::Either;
19use itertools::Itertools;
20use once_cell::sync::Lazy;
21pub(crate) use persisted_queries::PersistedQueries;
22pub(crate) use persisted_queries::PersistedQueriesPrewarmQueryPlanCache;
23#[cfg(test)]
24pub(crate) use persisted_queries::PersistedQueriesSafelist;
25use regex::Regex;
26use rustls::ServerConfig;
27use rustls::pki_types::CertificateDer;
28use rustls::pki_types::PrivateKeyDer;
29use schemars::JsonSchema;
30use schemars::r#gen::SchemaGenerator;
31use schemars::schema::ObjectValidation;
32use schemars::schema::Schema;
33use schemars::schema::SchemaObject;
34use serde::Deserialize;
35use serde::Deserializer;
36use serde::Serialize;
37use serde_json::Map;
38use serde_json::Value;
39use sha2::Digest;
40use thiserror::Error;
41
42use self::cors::Cors;
43use self::expansion::Expansion;
44pub(crate) use self::experimental::Discussed;
45pub(crate) use self::schema::generate_config_schema;
46pub(crate) use self::schema::generate_upgrade;
47pub(crate) use self::schema::validate_yaml_configuration;
48use self::server::Server;
49use self::subgraph::SubgraphConfiguration;
50use crate::ApolloRouterError;
51use crate::cache::DEFAULT_CACHE_CAPACITY;
52use crate::configuration::cooperative_cancellation::CooperativeCancellation;
53use crate::graphql;
54use crate::notification::Notify;
55use crate::plugin::plugins;
56use crate::plugins::chaos;
57use crate::plugins::chaos::Config;
58use crate::plugins::healthcheck::Config as HealthCheck;
59#[cfg(test)]
60use crate::plugins::healthcheck::test_listen;
61use crate::plugins::limits;
62use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
63use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN_NAME;
64use crate::plugins::subscription::SubscriptionConfig;
65use crate::uplink::UplinkConfig;
66
67pub(crate) mod connector;
68pub(crate) mod cooperative_cancellation;
69pub(crate) mod cors;
70pub(crate) mod expansion;
71mod experimental;
72pub(crate) mod metrics;
73pub(crate) mod mode;
74mod persisted_queries;
75pub(crate) mod schema;
76pub(crate) mod server;
77pub(crate) mod shared;
78pub(crate) mod subgraph;
79#[cfg(test)]
80mod tests;
81mod upgrade;
82mod yaml;
83
84static HEARTBEAT_TIMEOUT_DURATION_SECONDS: u64 = 15;
86
87static SUPERGRAPH_ENDPOINT_REGEX: Lazy<Regex> = Lazy::new(|| {
88 Regex::new(r"(?P<first_path>.*/)(?P<sub_path>.+)\*$")
89 .expect("this regex to check the path is valid")
90});
91
92#[derive(Debug, Error, Display)]
94#[non_exhaustive]
95pub enum ConfigurationError {
96 CannotExpandVariable { key: String, cause: String },
98 UnknownExpansionMode {
100 key: String,
101 supported_modes: String,
102 },
103 PluginUnknown(String),
105 PluginConfiguration { plugin: String, error: String },
107 InvalidConfiguration {
109 message: &'static str,
110 error: String,
111 },
112 DeserializeConfigError(serde_json::Error),
114
115 InvalidExpansionModeConfig,
117
118 MigrationFailure { error: String },
120
121 CertificateAuthorities { error: String },
123}
124
125impl From<proteus::Error> for ConfigurationError {
126 fn from(error: proteus::Error) -> Self {
127 Self::MigrationFailure {
128 error: error.to_string(),
129 }
130 }
131}
132
133impl From<proteus::parser::Error> for ConfigurationError {
134 fn from(error: proteus::parser::Error) -> Self {
135 Self::MigrationFailure {
136 error: error.to_string(),
137 }
138 }
139}
140
141#[derive(Clone, Derivative, Serialize, JsonSchema)]
146#[derivative(Debug)]
147pub struct Configuration {
149 #[serde(skip)]
151 pub(crate) validated_yaml: Option<Value>,
152
153 #[serde(default)]
155 pub(crate) health_check: HealthCheck,
156
157 #[serde(default)]
159 pub(crate) sandbox: Sandbox,
160
161 #[serde(default)]
163 pub(crate) homepage: Homepage,
164
165 #[serde(default)]
167 pub(crate) server: Server,
168
169 #[serde(default)]
171 pub(crate) supergraph: Supergraph,
172
173 #[serde(default)]
175 pub(crate) cors: Cors,
176
177 #[serde(default)]
178 pub(crate) tls: Tls,
179
180 #[serde(default)]
182 pub(crate) apq: Apq,
183
184 #[serde(default)]
186 pub persisted_queries: PersistedQueries,
187
188 #[serde(default)]
190 pub(crate) limits: limits::Config,
191
192 #[serde(default)]
195 pub(crate) experimental_chaos: Config,
196
197 #[serde(default)]
199 pub(crate) plugins: UserPlugins,
200
201 #[serde(default)]
203 #[serde(flatten)]
204 pub(crate) apollo_plugins: ApolloPlugins,
205
206 #[serde(skip)]
208 pub uplink: Option<UplinkConfig>,
209
210 #[serde(default, skip_serializing, skip_deserializing)]
211 pub(crate) notify: Notify<String, graphql::Response>,
212
213 #[serde(default)]
215 pub(crate) batching: Batching,
216
217 #[serde(default)]
219 pub(crate) experimental_type_conditioned_fetching: bool,
220}
221
222impl PartialEq for Configuration {
223 fn eq(&self, other: &Self) -> bool {
224 self.validated_yaml == other.validated_yaml
225 }
226}
227
228impl<'de> serde::Deserialize<'de> for Configuration {
229 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
230 where
231 D: serde::Deserializer<'de>,
232 {
233 #[derive(Deserialize, Default)]
236 #[serde(default)]
237 struct AdHocConfiguration {
238 health_check: HealthCheck,
239 sandbox: Sandbox,
240 homepage: Homepage,
241 server: Server,
242 supergraph: Supergraph,
243 cors: Cors,
244 plugins: UserPlugins,
245 #[serde(flatten)]
246 apollo_plugins: ApolloPlugins,
247 tls: Tls,
248 apq: Apq,
249 persisted_queries: PersistedQueries,
250 limits: limits::Config,
251 experimental_chaos: chaos::Config,
252 batching: Batching,
253 experimental_type_conditioned_fetching: bool,
254 }
255 let mut ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?;
256
257 let notify = Configuration::notify(&ad_hoc.apollo_plugins.plugins)
258 .map_err(|e| serde::de::Error::custom(e.to_string()))?;
259
260 ad_hoc.apollo_plugins.plugins.insert(
263 "limits".to_string(),
264 serde_json::to_value(&ad_hoc.limits).unwrap(),
265 );
266 ad_hoc.apollo_plugins.plugins.insert(
267 "health_check".to_string(),
268 serde_json::to_value(&ad_hoc.health_check).unwrap(),
269 );
270
271 Configuration {
273 health_check: ad_hoc.health_check,
274 sandbox: ad_hoc.sandbox,
275 homepage: ad_hoc.homepage,
276 server: ad_hoc.server,
277 supergraph: ad_hoc.supergraph,
278 cors: ad_hoc.cors,
279 tls: ad_hoc.tls,
280 apq: ad_hoc.apq,
281 persisted_queries: ad_hoc.persisted_queries,
282 limits: ad_hoc.limits,
283 experimental_chaos: ad_hoc.experimental_chaos,
284 experimental_type_conditioned_fetching: ad_hoc.experimental_type_conditioned_fetching,
285 plugins: ad_hoc.plugins,
286 apollo_plugins: ad_hoc.apollo_plugins,
287 batching: ad_hoc.batching,
288
289 notify,
291 uplink: None,
292 validated_yaml: None,
293 }
294 .validate()
295 .map_err(|e| serde::de::Error::custom(e.to_string()))
296 }
297}
298
299pub(crate) const APOLLO_PLUGIN_PREFIX: &str = "apollo.";
300
301fn default_graphql_listen() -> ListenAddr {
302 SocketAddr::from_str("127.0.0.1:4000").unwrap().into()
303}
304
305#[cfg(test)]
306#[buildstructor::buildstructor]
307impl Configuration {
308 #[builder]
309 pub(crate) fn new(
310 supergraph: Option<Supergraph>,
311 health_check: Option<HealthCheck>,
312 sandbox: Option<Sandbox>,
313 homepage: Option<Homepage>,
314 cors: Option<Cors>,
315 plugins: Map<String, Value>,
316 apollo_plugins: Map<String, Value>,
317 tls: Option<Tls>,
318 apq: Option<Apq>,
319 persisted_query: Option<PersistedQueries>,
320 operation_limits: Option<limits::Config>,
321 chaos: Option<chaos::Config>,
322 uplink: Option<UplinkConfig>,
323 experimental_type_conditioned_fetching: Option<bool>,
324 batching: Option<Batching>,
325 server: Option<Server>,
326 ) -> Result<Self, ConfigurationError> {
327 let notify = Self::notify(&apollo_plugins)?;
328
329 let conf = Self {
330 validated_yaml: Default::default(),
331 supergraph: supergraph.unwrap_or_default(),
332 server: server.unwrap_or_default(),
333 health_check: health_check.unwrap_or_default(),
334 sandbox: sandbox.unwrap_or_default(),
335 homepage: homepage.unwrap_or_default(),
336 cors: cors.unwrap_or_default(),
337 apq: apq.unwrap_or_default(),
338 persisted_queries: persisted_query.unwrap_or_default(),
339 limits: operation_limits.unwrap_or_default(),
340 experimental_chaos: chaos.unwrap_or_default(),
341 plugins: UserPlugins {
342 plugins: Some(plugins),
343 },
344 apollo_plugins: ApolloPlugins {
345 plugins: apollo_plugins,
346 },
347 tls: tls.unwrap_or_default(),
348 uplink,
349 batching: batching.unwrap_or_default(),
350 experimental_type_conditioned_fetching: experimental_type_conditioned_fetching
351 .unwrap_or_default(),
352 notify,
353 };
354
355 conf.validate()
356 }
357}
358
359impl Configuration {
360 pub(crate) fn hash(&self) -> String {
361 let mut hasher = sha2::Sha256::new();
362 let defaulted_raw = self
363 .validated_yaml
364 .as_ref()
365 .map(|s| serde_yaml::to_string(s).expect("config was not serializable"))
366 .unwrap_or_default();
367 hasher.update(defaulted_raw);
368 let hash: String = format!("{:x}", hasher.finalize());
369 hash
370 }
371
372 fn notify(
373 apollo_plugins: &Map<String, Value>,
374 ) -> Result<Notify<String, graphql::Response>, ConfigurationError> {
375 if cfg!(test) {
376 return Ok(Notify::for_tests());
377 }
378 let notify_queue_cap = match apollo_plugins.get(APOLLO_SUBSCRIPTION_PLUGIN_NAME) {
379 Some(plugin_conf) => {
380 let conf = serde_json::from_value::<SubscriptionConfig>(plugin_conf.clone())
381 .map_err(|err| ConfigurationError::PluginConfiguration {
382 plugin: APOLLO_SUBSCRIPTION_PLUGIN.to_string(),
383 error: format!("{err:?}"),
384 })?;
385 conf.queue_capacity
386 }
387 None => None,
388 };
389 Ok(Notify::builder()
390 .and_queue_size(notify_queue_cap)
391 .ttl(Duration::from_secs(HEARTBEAT_TIMEOUT_DURATION_SECONDS))
392 .heartbeat_error_message(
393 graphql::Response::builder()
394 .errors(vec![
395 graphql::Error::builder()
396 .message("the connection has been closed because it hasn't heartbeat for a while")
397 .extension_code("SUBSCRIPTION_HEARTBEAT_ERROR")
398 .build()
399 ])
400 .build()
401 ).build())
402 }
403
404 pub(crate) fn rust_query_planner_config(
405 &self,
406 ) -> apollo_federation::query_plan::query_planner::QueryPlannerConfig {
407 use apollo_federation::query_plan::query_planner::QueryPlanIncrementalDeliveryConfig;
408 use apollo_federation::query_plan::query_planner::QueryPlannerConfig;
409 use apollo_federation::query_plan::query_planner::QueryPlannerDebugConfig;
410
411 let max_evaluated_plans = self
412 .supergraph
413 .query_planning
414 .experimental_plans_limit
415 .and_then(NonZeroU32::new)
417 .unwrap_or(NonZeroU32::new(10_000).expect("it is not zero"));
418
419 QueryPlannerConfig {
420 subgraph_graphql_validation: false,
421 generate_query_fragments: self.supergraph.generate_query_fragments,
422 incremental_delivery: QueryPlanIncrementalDeliveryConfig {
423 enable_defer: self.supergraph.defer_support,
424 },
425 type_conditioned_fetching: self.experimental_type_conditioned_fetching,
426 debug: QueryPlannerDebugConfig {
427 max_evaluated_plans,
428 paths_limit: self.supergraph.query_planning.experimental_paths_limit,
429 },
430 }
431 }
432}
433
434impl Default for Configuration {
435 fn default() -> Self {
436 Configuration::from_str("").expect("default configuration must be valid")
438 }
439}
440
441#[cfg(test)]
442#[buildstructor::buildstructor]
443impl Configuration {
444 #[builder]
445 pub(crate) fn fake_new(
446 supergraph: Option<Supergraph>,
447 health_check: Option<HealthCheck>,
448 sandbox: Option<Sandbox>,
449 homepage: Option<Homepage>,
450 cors: Option<Cors>,
451 plugins: Map<String, Value>,
452 apollo_plugins: Map<String, Value>,
453 tls: Option<Tls>,
454 notify: Option<Notify<String, graphql::Response>>,
455 apq: Option<Apq>,
456 persisted_query: Option<PersistedQueries>,
457 operation_limits: Option<limits::Config>,
458 chaos: Option<chaos::Config>,
459 uplink: Option<UplinkConfig>,
460 batching: Option<Batching>,
461 experimental_type_conditioned_fetching: Option<bool>,
462 server: Option<Server>,
463 ) -> Result<Self, ConfigurationError> {
464 let configuration = Self {
465 validated_yaml: Default::default(),
466 server: server.unwrap_or_default(),
467 supergraph: supergraph.unwrap_or_else(|| Supergraph::fake_builder().build()),
468 health_check: health_check.unwrap_or_else(|| HealthCheck::builder().build()),
469 sandbox: sandbox.unwrap_or_else(|| Sandbox::fake_builder().build()),
470 homepage: homepage.unwrap_or_else(|| Homepage::fake_builder().build()),
471 cors: cors.unwrap_or_default(),
472 limits: operation_limits.unwrap_or_default(),
473 experimental_chaos: chaos.unwrap_or_default(),
474 plugins: UserPlugins {
475 plugins: Some(plugins),
476 },
477 apollo_plugins: ApolloPlugins {
478 plugins: apollo_plugins,
479 },
480 tls: tls.unwrap_or_default(),
481 notify: notify.unwrap_or_default(),
482 apq: apq.unwrap_or_default(),
483 persisted_queries: persisted_query.unwrap_or_default(),
484 uplink,
485 experimental_type_conditioned_fetching: experimental_type_conditioned_fetching
486 .unwrap_or_default(),
487 batching: batching.unwrap_or_default(),
488 };
489
490 configuration.validate()
491 }
492}
493
494impl Configuration {
495 pub(crate) fn validate(self) -> Result<Self, ConfigurationError> {
496 if self.sandbox.enabled && self.homepage.enabled {
498 return Err(ConfigurationError::InvalidConfiguration {
499 message: "sandbox and homepage cannot be enabled at the same time",
500 error: "disable the homepage if you want to enable sandbox".to_string(),
501 });
502 }
503 if self.sandbox.enabled && !self.supergraph.introspection {
505 return Err(ConfigurationError::InvalidConfiguration {
506 message: "sandbox requires introspection",
507 error: "sandbox needs introspection to be enabled".to_string(),
508 });
509 }
510 if !self.supergraph.path.starts_with('/') {
511 return Err(ConfigurationError::InvalidConfiguration {
512 message: "invalid 'server.graphql_path' configuration",
513 error: format!(
514 "'{}' is invalid, it must be an absolute path and start with '/', you should try with '/{}'",
515 self.supergraph.path, self.supergraph.path
516 ),
517 });
518 }
519 if self.supergraph.path.ends_with('*')
520 && !self.supergraph.path.ends_with("/*")
521 && !SUPERGRAPH_ENDPOINT_REGEX.is_match(&self.supergraph.path)
522 {
523 return Err(ConfigurationError::InvalidConfiguration {
524 message: "invalid 'server.graphql_path' configuration",
525 error: format!(
526 "'{}' is invalid, you can only set a wildcard after a '/'",
527 self.supergraph.path
528 ),
529 });
530 }
531 if self.supergraph.path.contains("/*/") {
532 return Err(ConfigurationError::InvalidConfiguration {
533 message: "invalid 'server.graphql_path' configuration",
534 error: format!(
535 "'{}' is invalid, if you need to set a path like '/*/graphql' then specify it as a path parameter with a name, for example '/:my_project_key/graphql'",
536 self.supergraph.path
537 ),
538 });
539 }
540
541 if self.persisted_queries.enabled {
543 if self.persisted_queries.safelist.enabled && self.apq.enabled {
544 return Err(ConfigurationError::InvalidConfiguration {
545 message: "apqs must be disabled to enable safelisting",
546 error: "either set persisted_queries.safelist.enabled: false or apq.enabled: false in your router yaml configuration".into()
547 });
548 } else if !self.persisted_queries.safelist.enabled
549 && self.persisted_queries.safelist.require_id
550 {
551 return Err(ConfigurationError::InvalidConfiguration {
552 message: "safelist must be enabled to require IDs",
553 error: "either set persisted_queries.safelist.enabled: true or persisted_queries.safelist.require_id: false in your router yaml configuration".into()
554 });
555 }
556 } else {
557 if self.persisted_queries.safelist.enabled {
559 return Err(ConfigurationError::InvalidConfiguration {
560 message: "persisted queries must be enabled to enable safelisting",
561 error: "either set persisted_queries.safelist.enabled: false or persisted_queries.enabled: true in your router yaml configuration".into()
562 });
563 } else if self.persisted_queries.log_unknown {
564 return Err(ConfigurationError::InvalidConfiguration {
565 message: "persisted queries must be enabled to enable logging unknown operations",
566 error: "either set persisted_queries.log_unknown: false or persisted_queries.enabled: true in your router yaml configuration".into()
567 });
568 }
569 }
570
571 Ok(self)
572 }
573}
574
575impl FromStr for Configuration {
577 type Err = ConfigurationError;
578
579 fn from_str(s: &str) -> Result<Self, Self::Err> {
580 schema::validate_yaml_configuration(s, Expansion::default()?, schema::Mode::Upgrade)?
581 .validate()
582 }
583}
584
585fn gen_schema(
586 plugins: schemars::Map<String, Schema>,
587 hidden_plugins: Option<schemars::Map<String, Schema>>,
588) -> Schema {
589 let plugins_object = SchemaObject {
590 object: Some(Box::new(ObjectValidation {
591 properties: plugins,
592 additional_properties: Option::Some(Box::new(Schema::Bool(false))),
593 pattern_properties: hidden_plugins
594 .unwrap_or_default()
595 .into_iter()
596 .map(|(k, v)| (format!("^{}$", k), v))
598 .collect(),
599 ..Default::default()
600 })),
601 ..Default::default()
602 };
603
604 Schema::Object(plugins_object)
605}
606
607#[derive(Clone, Debug, Default, Deserialize, Serialize)]
613#[serde(transparent)]
614pub(crate) struct ApolloPlugins {
615 pub(crate) plugins: Map<String, Value>,
616}
617
618impl JsonSchema for ApolloPlugins {
619 fn schema_name() -> String {
620 stringify!(Plugins).to_string()
621 }
622
623 fn json_schema(generator: &mut SchemaGenerator) -> Schema {
624 let (plugin_entries, hidden_plugin_entries): (Vec<_>, Vec<_>) = crate::plugin::plugins()
628 .sorted_by_key(|factory| factory.name.clone())
629 .filter(|factory| factory.name.starts_with(APOLLO_PLUGIN_PREFIX))
630 .partition_map(|factory| {
631 let key = factory.name[APOLLO_PLUGIN_PREFIX.len()..].to_string();
632 let schema = factory.create_schema(generator);
633 if factory.hidden_from_config_json_schema {
635 Either::Right((key, schema))
636 } else {
637 Either::Left((key, schema))
638 }
639 });
640 gen_schema(
641 plugin_entries.into_iter().collect(),
642 Some(hidden_plugin_entries.into_iter().collect()),
643 )
644 }
645}
646
647#[derive(Clone, Debug, Default, Deserialize, Serialize)]
652#[serde(transparent)]
653pub(crate) struct UserPlugins {
654 pub(crate) plugins: Option<Map<String, Value>>,
655}
656
657impl JsonSchema for UserPlugins {
658 fn schema_name() -> String {
659 stringify!(Plugins).to_string()
660 }
661
662 fn json_schema(generator: &mut SchemaGenerator) -> Schema {
663 let plugins = crate::plugin::plugins()
667 .sorted_by_key(|factory| factory.name.clone())
668 .filter(|factory| !factory.name.starts_with(APOLLO_PLUGIN_PREFIX))
669 .map(|factory| (factory.name.to_string(), factory.create_schema(generator)))
670 .collect::<schemars::Map<String, Schema>>();
671 gen_schema(plugins, None)
672 }
673}
674
675#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
677#[serde(deny_unknown_fields)]
678#[serde(default)]
679pub(crate) struct Supergraph {
680 pub(crate) listen: ListenAddr,
683
684 #[serde(deserialize_with = "humantime_serde::deserialize")]
686 #[schemars(with = "String", default = "default_connection_shutdown_timeout")]
687 pub(crate) connection_shutdown_timeout: Duration,
688
689 pub(crate) path: String,
692
693 pub(crate) introspection: bool,
696
697 pub(crate) generate_query_fragments: bool,
700
701 pub(crate) defer_support: bool,
703
704 pub(crate) query_planning: QueryPlanning,
706
707 pub(crate) early_cancel: bool,
712
713 pub(crate) experimental_log_on_broken_pipe: bool,
716}
717
718const fn default_generate_query_fragments() -> bool {
719 true
720}
721
722#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)]
723#[serde(rename_all = "snake_case")]
724pub(crate) enum Auto {
725 Auto,
726}
727
728fn default_defer_support() -> bool {
729 true
730}
731
732#[buildstructor::buildstructor]
733impl Supergraph {
734 #[builder]
735 pub(crate) fn new(
736 listen: Option<ListenAddr>,
737 path: Option<String>,
738 connection_shutdown_timeout: Option<Duration>,
739 introspection: Option<bool>,
740 defer_support: Option<bool>,
741 query_planning: Option<QueryPlanning>,
742 generate_query_fragments: Option<bool>,
743 early_cancel: Option<bool>,
744 experimental_log_on_broken_pipe: Option<bool>,
745 ) -> Self {
746 Self {
747 listen: listen.unwrap_or_else(default_graphql_listen),
748 path: path.unwrap_or_else(default_graphql_path),
749 connection_shutdown_timeout: connection_shutdown_timeout
750 .unwrap_or_else(default_connection_shutdown_timeout),
751 introspection: introspection.unwrap_or_else(default_graphql_introspection),
752 defer_support: defer_support.unwrap_or_else(default_defer_support),
753 query_planning: query_planning.unwrap_or_default(),
754 generate_query_fragments: generate_query_fragments
755 .unwrap_or_else(default_generate_query_fragments),
756 early_cancel: early_cancel.unwrap_or_default(),
757 experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
758 }
759 }
760}
761
762#[cfg(test)]
763#[buildstructor::buildstructor]
764impl Supergraph {
765 #[builder]
766 pub(crate) fn fake_new(
767 listen: Option<ListenAddr>,
768 path: Option<String>,
769 connection_shutdown_timeout: Option<Duration>,
770 introspection: Option<bool>,
771 defer_support: Option<bool>,
772 query_planning: Option<QueryPlanning>,
773 generate_query_fragments: Option<bool>,
774 early_cancel: Option<bool>,
775 experimental_log_on_broken_pipe: Option<bool>,
776 ) -> Self {
777 Self {
778 listen: listen.unwrap_or_else(test_listen),
779 path: path.unwrap_or_else(default_graphql_path),
780 connection_shutdown_timeout: connection_shutdown_timeout
781 .unwrap_or_else(default_connection_shutdown_timeout),
782 introspection: introspection.unwrap_or_else(default_graphql_introspection),
783 defer_support: defer_support.unwrap_or_else(default_defer_support),
784 query_planning: query_planning.unwrap_or_default(),
785 generate_query_fragments: generate_query_fragments
786 .unwrap_or_else(default_generate_query_fragments),
787 early_cancel: early_cancel.unwrap_or_default(),
788 experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
789 }
790 }
791}
792
793impl Default for Supergraph {
794 fn default() -> Self {
795 Self::builder().build()
796 }
797}
798
799impl Supergraph {
800 pub(crate) fn sanitized_path(&self) -> String {
802 let mut path = self.path.clone();
803 if self.path.ends_with("/*") {
804 path = format!("{}router_extra_path", self.path);
806 } else if SUPERGRAPH_ENDPOINT_REGEX.is_match(&self.path) {
807 let new_path = SUPERGRAPH_ENDPOINT_REGEX
808 .replace(&self.path, "${first_path}${sub_path}{supergraph_route}");
809 path = new_path.to_string();
810 }
811
812 path
813 }
814}
815
816#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)]
818#[serde(deny_unknown_fields)]
819pub(crate) struct Router {
820 #[serde(default)]
821 pub(crate) cache: Cache,
822}
823
824#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
826#[serde(deny_unknown_fields, default)]
827pub(crate) struct Apq {
828 pub(crate) enabled: bool,
830
831 pub(crate) router: Router,
832
833 pub(crate) subgraph: SubgraphConfiguration<SubgraphApq>,
834}
835
836#[cfg(test)]
837#[buildstructor::buildstructor]
838impl Apq {
839 #[builder]
840 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
841 Self {
842 enabled: enabled.unwrap_or_else(default_apq),
843 ..Default::default()
844 }
845 }
846}
847
848#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
850#[serde(deny_unknown_fields, default)]
851pub(crate) struct SubgraphApq {
852 pub(crate) enabled: bool,
854}
855
856fn default_apq() -> bool {
857 true
858}
859
860impl Default for Apq {
861 fn default() -> Self {
862 Self {
863 enabled: default_apq(),
864 router: Default::default(),
865 subgraph: Default::default(),
866 }
867 }
868}
869
870#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)]
872#[serde(deny_unknown_fields, default)]
873pub(crate) struct QueryPlanning {
874 pub(crate) cache: QueryPlanCache,
876 pub(crate) warmed_up_queries: Option<usize>,
881
882 pub(crate) experimental_plans_limit: Option<u32>,
890
891 pub(crate) experimental_paths_limit: Option<u32>,
904
905 pub(crate) experimental_reuse_query_plans: bool,
908
909 pub(crate) experimental_cooperative_cancellation: CooperativeCancellation,
913}
914
915#[buildstructor::buildstructor]
916impl QueryPlanning {
917 #[builder]
918 #[allow(dead_code)]
919 pub(crate) fn new(
920 cache: Option<QueryPlanCache>,
921 warmed_up_queries: Option<usize>,
922 experimental_plans_limit: Option<u32>,
923 experimental_paths_limit: Option<u32>,
924 experimental_reuse_query_plans: Option<bool>,
925 experimental_cooperative_cancellation: Option<CooperativeCancellation>,
926 ) -> Self {
927 Self {
928 cache: cache.unwrap_or_default(),
929 warmed_up_queries,
930 experimental_plans_limit,
931 experimental_paths_limit,
932 experimental_reuse_query_plans: experimental_reuse_query_plans.unwrap_or_default(),
933 experimental_cooperative_cancellation: experimental_cooperative_cancellation
934 .unwrap_or_default(),
935 }
936 }
937}
938
939#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
941#[serde(deny_unknown_fields, default)]
942pub(crate) struct QueryPlanCache {
943 pub(crate) in_memory: InMemoryCache,
945 pub(crate) redis: Option<QueryPlanRedisCache>,
947}
948
949#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
950#[serde(deny_unknown_fields)]
951pub(crate) struct QueryPlanRedisCache {
953 pub(crate) urls: Vec<url::Url>,
955
956 pub(crate) username: Option<String>,
958 pub(crate) password: Option<String>,
960
961 #[serde(deserialize_with = "humantime_serde::deserialize", default)]
962 #[schemars(with = "Option<String>", default)]
963 pub(crate) timeout: Option<Duration>,
965
966 #[serde(
967 deserialize_with = "humantime_serde::deserialize",
968 default = "default_query_plan_cache_ttl"
969 )]
970 #[schemars(with = "Option<String>", default = "default_query_plan_cache_ttl")]
971 pub(crate) ttl: Duration,
973
974 pub(crate) namespace: Option<String>,
976
977 #[serde(default)]
978 pub(crate) tls: Option<TlsClient>,
980
981 #[serde(default = "default_required_to_start")]
982 pub(crate) required_to_start: bool,
984
985 #[serde(default = "default_reset_ttl")]
986 pub(crate) reset_ttl: bool,
988
989 #[serde(default = "default_query_planner_cache_pool_size")]
990 pub(crate) pool_size: u32,
992}
993
994fn default_query_plan_cache_ttl() -> Duration {
995 Duration::from_secs(86400 * 30)
997}
998
999fn default_query_planner_cache_pool_size() -> u32 {
1000 1
1001}
1002
1003#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1005#[serde(deny_unknown_fields, default)]
1006pub(crate) struct Cache {
1007 pub(crate) in_memory: InMemoryCache,
1009 pub(crate) redis: Option<RedisCache>,
1011}
1012
1013impl From<QueryPlanCache> for Cache {
1014 fn from(value: QueryPlanCache) -> Self {
1015 Cache {
1016 in_memory: value.in_memory,
1017 redis: value.redis.map(Into::into),
1018 }
1019 }
1020}
1021
1022#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1023#[serde(deny_unknown_fields)]
1024pub(crate) struct InMemoryCache {
1026 pub(crate) limit: NonZeroUsize,
1028}
1029
1030impl Default for InMemoryCache {
1031 fn default() -> Self {
1032 Self {
1033 limit: DEFAULT_CACHE_CAPACITY,
1034 }
1035 }
1036}
1037
1038#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1039#[serde(deny_unknown_fields)]
1040pub(crate) struct RedisCache {
1042 pub(crate) urls: Vec<url::Url>,
1044
1045 pub(crate) username: Option<String>,
1047 pub(crate) password: Option<String>,
1049
1050 #[serde(deserialize_with = "humantime_serde::deserialize", default)]
1051 #[schemars(with = "Option<String>", default)]
1052 pub(crate) timeout: Option<Duration>,
1054
1055 #[serde(deserialize_with = "humantime_serde::deserialize", default)]
1056 #[schemars(with = "Option<String>", default)]
1057 pub(crate) ttl: Option<Duration>,
1059
1060 pub(crate) namespace: Option<String>,
1062
1063 #[serde(default)]
1064 pub(crate) tls: Option<TlsClient>,
1066
1067 #[serde(default = "default_required_to_start")]
1068 pub(crate) required_to_start: bool,
1070
1071 #[serde(default = "default_reset_ttl")]
1072 pub(crate) reset_ttl: bool,
1074
1075 #[serde(default = "default_pool_size")]
1076 pub(crate) pool_size: u32,
1078 #[serde(
1079 deserialize_with = "humantime_serde::deserialize",
1080 default = "default_metrics_interval"
1081 )]
1082 #[schemars(with = "Option<String>", default)]
1083 pub(crate) metrics_interval: Duration,
1085}
1086
1087fn default_required_to_start() -> bool {
1088 false
1089}
1090
1091fn default_pool_size() -> u32 {
1092 1
1093}
1094
1095pub(crate) fn default_metrics_interval() -> Duration {
1096 Duration::from_secs(1)
1097}
1098
1099impl From<QueryPlanRedisCache> for RedisCache {
1100 fn from(value: QueryPlanRedisCache) -> Self {
1101 RedisCache {
1102 urls: value.urls,
1103 username: value.username,
1104 password: value.password,
1105 timeout: value.timeout,
1106 ttl: Some(value.ttl),
1107 namespace: value.namespace,
1108 tls: value.tls,
1109 required_to_start: value.required_to_start,
1110 reset_ttl: value.reset_ttl,
1111 pool_size: value.pool_size,
1112 metrics_interval: default_metrics_interval(),
1113 }
1114 }
1115}
1116
1117fn default_reset_ttl() -> bool {
1118 true
1119}
1120
1121#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1123#[serde(deny_unknown_fields)]
1124#[serde(default)]
1125pub(crate) struct Tls {
1126 pub(crate) supergraph: Option<Arc<TlsSupergraph>>,
1130 pub(crate) subgraph: SubgraphConfiguration<TlsClient>,
1131 pub(crate) connector: ConnectorConfiguration<TlsClient>,
1132}
1133
1134#[derive(Debug, Deserialize, Serialize, JsonSchema)]
1136#[serde(deny_unknown_fields)]
1137pub(crate) struct TlsSupergraph {
1138 #[serde(deserialize_with = "deserialize_certificate", skip_serializing)]
1140 #[schemars(with = "String")]
1141 pub(crate) certificate: CertificateDer<'static>,
1142 #[serde(deserialize_with = "deserialize_key", skip_serializing)]
1144 #[schemars(with = "String")]
1145 pub(crate) key: PrivateKeyDer<'static>,
1146 #[serde(deserialize_with = "deserialize_certificate_chain", skip_serializing)]
1148 #[schemars(with = "String")]
1149 pub(crate) certificate_chain: Vec<CertificateDer<'static>>,
1150}
1151
1152impl TlsSupergraph {
1153 pub(crate) fn tls_config(&self) -> Result<Arc<rustls::ServerConfig>, ApolloRouterError> {
1154 let mut certificates = vec![self.certificate.clone()];
1155 certificates.extend(self.certificate_chain.iter().cloned());
1156
1157 let mut config = ServerConfig::builder()
1158 .with_no_client_auth()
1159 .with_single_cert(certificates, self.key.clone_key())
1160 .map_err(ApolloRouterError::Rustls)?;
1161 config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
1162
1163 Ok(Arc::new(config))
1164 }
1165}
1166
1167fn deserialize_certificate<'de, D>(deserializer: D) -> Result<CertificateDer<'static>, D::Error>
1168where
1169 D: Deserializer<'de>,
1170{
1171 let data = String::deserialize(deserializer)?;
1172
1173 load_certs(&data)
1174 .map_err(serde::de::Error::custom)
1175 .and_then(|mut certs| {
1176 if certs.len() > 1 {
1177 Err(serde::de::Error::custom("expected exactly one certificate"))
1178 } else {
1179 certs
1180 .pop()
1181 .ok_or(serde::de::Error::custom("expected exactly one certificate"))
1182 }
1183 })
1184}
1185
1186fn deserialize_certificate_chain<'de, D>(
1187 deserializer: D,
1188) -> Result<Vec<CertificateDer<'static>>, D::Error>
1189where
1190 D: Deserializer<'de>,
1191{
1192 let data = String::deserialize(deserializer)?;
1193
1194 load_certs(&data).map_err(serde::de::Error::custom)
1195}
1196
1197fn deserialize_key<'de, D>(deserializer: D) -> Result<PrivateKeyDer<'static>, D::Error>
1198where
1199 D: Deserializer<'de>,
1200{
1201 let data = String::deserialize(deserializer)?;
1202
1203 load_key(&data).map_err(serde::de::Error::custom)
1204}
1205
1206#[derive(thiserror::Error, Debug)]
1207#[error("could not load TLS certificate: {0}")]
1208struct LoadCertError(std::io::Error);
1209
1210pub(crate) fn load_certs(data: &str) -> io::Result<Vec<CertificateDer<'static>>> {
1211 rustls_pemfile::certs(&mut BufReader::new(data.as_bytes()))
1212 .collect::<Result<Vec<_>, _>>()
1213 .map_err(|error| io::Error::new(io::ErrorKind::InvalidInput, LoadCertError(error)))
1214}
1215
1216pub(crate) fn load_key(data: &str) -> io::Result<PrivateKeyDer<'static>> {
1217 let mut reader = BufReader::new(data.as_bytes());
1218 let mut key_iterator = iter::from_fn(|| rustls_pemfile::read_one(&mut reader).transpose());
1219
1220 let private_key = match key_iterator.next() {
1221 Some(Ok(rustls_pemfile::Item::Pkcs1Key(key))) => PrivateKeyDer::from(key),
1222 Some(Ok(rustls_pemfile::Item::Pkcs8Key(key))) => PrivateKeyDer::from(key),
1223 Some(Ok(rustls_pemfile::Item::Sec1Key(key))) => PrivateKeyDer::from(key),
1224 Some(Err(e)) => {
1225 return Err(io::Error::new(
1226 io::ErrorKind::InvalidInput,
1227 format!("could not parse the key: {e}"),
1228 ));
1229 }
1230 Some(_) => {
1231 return Err(io::Error::new(
1232 io::ErrorKind::InvalidInput,
1233 "expected a private key",
1234 ));
1235 }
1236 None => {
1237 return Err(io::Error::new(
1238 io::ErrorKind::InvalidInput,
1239 "could not find a private key",
1240 ));
1241 }
1242 };
1243
1244 if key_iterator.next().is_some() {
1245 return Err(io::Error::new(
1246 io::ErrorKind::InvalidInput,
1247 "expected exactly one private key",
1248 ));
1249 }
1250 Ok(private_key)
1251}
1252
1253#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1255#[serde(deny_unknown_fields)]
1256#[serde(default)]
1257pub(crate) struct TlsClient {
1258 pub(crate) certificate_authorities: Option<String>,
1260 pub(crate) client_authentication: Option<Arc<TlsClientAuth>>,
1262}
1263
1264#[buildstructor::buildstructor]
1265impl TlsClient {
1266 #[builder]
1267 pub(crate) fn new(
1268 certificate_authorities: Option<String>,
1269 client_authentication: Option<Arc<TlsClientAuth>>,
1270 ) -> Self {
1271 Self {
1272 certificate_authorities,
1273 client_authentication,
1274 }
1275 }
1276}
1277
1278impl Default for TlsClient {
1279 fn default() -> Self {
1280 Self::builder().build()
1281 }
1282}
1283
1284#[derive(Debug, Deserialize, Serialize, JsonSchema)]
1286#[serde(deny_unknown_fields)]
1287pub(crate) struct TlsClientAuth {
1288 #[serde(deserialize_with = "deserialize_certificate_chain", skip_serializing)]
1290 #[schemars(with = "String")]
1291 pub(crate) certificate_chain: Vec<CertificateDer<'static>>,
1292 #[serde(deserialize_with = "deserialize_key", skip_serializing)]
1294 #[schemars(with = "String")]
1295 pub(crate) key: PrivateKeyDer<'static>,
1296}
1297
1298#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1300#[serde(deny_unknown_fields)]
1301#[serde(default)]
1302pub(crate) struct Sandbox {
1303 pub(crate) enabled: bool,
1305}
1306
1307fn default_sandbox() -> bool {
1308 false
1309}
1310
1311#[buildstructor::buildstructor]
1312impl Sandbox {
1313 #[builder]
1314 pub(crate) fn new(enabled: Option<bool>) -> Self {
1315 Self {
1316 enabled: enabled.unwrap_or_else(default_sandbox),
1317 }
1318 }
1319}
1320
1321#[cfg(test)]
1322#[buildstructor::buildstructor]
1323impl Sandbox {
1324 #[builder]
1325 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
1326 Self {
1327 enabled: enabled.unwrap_or_else(default_sandbox),
1328 }
1329 }
1330}
1331
1332impl Default for Sandbox {
1333 fn default() -> Self {
1334 Self::builder().build()
1335 }
1336}
1337
1338#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1340#[serde(deny_unknown_fields)]
1341#[serde(default)]
1342pub(crate) struct Homepage {
1343 pub(crate) enabled: bool,
1345 pub(crate) graph_ref: Option<String>,
1348}
1349
1350fn default_homepage() -> bool {
1351 true
1352}
1353
1354#[buildstructor::buildstructor]
1355impl Homepage {
1356 #[builder]
1357 pub(crate) fn new(enabled: Option<bool>) -> Self {
1358 Self {
1359 enabled: enabled.unwrap_or_else(default_homepage),
1360 graph_ref: None,
1361 }
1362 }
1363}
1364
1365#[cfg(test)]
1366#[buildstructor::buildstructor]
1367impl Homepage {
1368 #[builder]
1369 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
1370 Self {
1371 enabled: enabled.unwrap_or_else(default_homepage),
1372 graph_ref: None,
1373 }
1374 }
1375}
1376
1377impl Default for Homepage {
1378 fn default() -> Self {
1379 Self::builder().enabled(default_homepage()).build()
1380 }
1381}
1382
1383#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema)]
1385#[serde(untagged)]
1386pub enum ListenAddr {
1387 SocketAddr(SocketAddr),
1389 #[cfg(unix)]
1391 UnixSocket(std::path::PathBuf),
1392}
1393
1394impl ListenAddr {
1395 pub(crate) fn ip_and_port(&self) -> Option<(IpAddr, u16)> {
1396 #[cfg_attr(not(unix), allow(irrefutable_let_patterns))]
1397 if let Self::SocketAddr(addr) = self {
1398 Some((addr.ip(), addr.port()))
1399 } else {
1400 None
1401 }
1402 }
1403}
1404
1405impl From<SocketAddr> for ListenAddr {
1406 fn from(addr: SocketAddr) -> Self {
1407 Self::SocketAddr(addr)
1408 }
1409}
1410
1411#[allow(clippy::from_over_into)]
1412impl Into<serde_json::Value> for ListenAddr {
1413 fn into(self) -> serde_json::Value {
1414 match self {
1415 Self::SocketAddr(addr) => serde_json::Value::String(addr.to_string()),
1418 #[cfg(unix)]
1419 Self::UnixSocket(path) => serde_json::Value::String(
1420 path.as_os_str()
1421 .to_str()
1422 .expect("unsupported non-UTF-8 path")
1423 .to_string(),
1424 ),
1425 }
1426 }
1427}
1428
1429#[cfg(unix)]
1430impl From<tokio_util::either::Either<std::net::SocketAddr, tokio::net::unix::SocketAddr>>
1431 for ListenAddr
1432{
1433 fn from(
1434 addr: tokio_util::either::Either<std::net::SocketAddr, tokio::net::unix::SocketAddr>,
1435 ) -> Self {
1436 match addr {
1437 tokio_util::either::Either::Left(addr) => Self::SocketAddr(addr),
1438 tokio_util::either::Either::Right(addr) => Self::UnixSocket(
1439 addr.as_pathname()
1440 .map(ToOwned::to_owned)
1441 .unwrap_or_default(),
1442 ),
1443 }
1444 }
1445}
1446
1447impl fmt::Display for ListenAddr {
1448 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1449 match self {
1450 Self::SocketAddr(addr) => write!(f, "http://{addr}"),
1451 #[cfg(unix)]
1452 Self::UnixSocket(path) => write!(f, "{}", path.display()),
1453 }
1454 }
1455}
1456
1457fn default_graphql_path() -> String {
1458 String::from("/")
1459}
1460
1461fn default_graphql_introspection() -> bool {
1462 false
1463}
1464
1465fn default_connection_shutdown_timeout() -> Duration {
1466 Duration::from_secs(60)
1467}
1468
1469#[derive(Clone, Debug, Default, Error, Display, Serialize, Deserialize, JsonSchema)]
1470#[serde(deny_unknown_fields, rename_all = "snake_case")]
1471pub(crate) enum BatchingMode {
1472 #[default]
1474 BatchHttpLink,
1475}
1476
1477#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1479#[serde(deny_unknown_fields)]
1480pub(crate) struct Batching {
1481 #[serde(default)]
1483 pub(crate) enabled: bool,
1484
1485 pub(crate) mode: BatchingMode,
1487
1488 pub(crate) subgraph: Option<SubgraphConfiguration<CommonBatchingConfig>>,
1490
1491 #[serde(default)]
1493 pub(crate) maximum_size: Option<usize>,
1494}
1495
1496#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1498pub(crate) struct CommonBatchingConfig {
1499 pub(crate) enabled: bool,
1501}
1502
1503impl Batching {
1504 pub(crate) fn batch_include(&self, service_name: &str) -> bool {
1506 match &self.subgraph {
1507 Some(subgraph_batching_config) => {
1508 if subgraph_batching_config.all.enabled {
1510 subgraph_batching_config
1514 .subgraphs
1515 .get(service_name)
1516 .is_none_or(|x| x.enabled)
1517 } else {
1518 subgraph_batching_config
1521 .subgraphs
1522 .get(service_name)
1523 .is_some_and(|x| x.enabled)
1524 }
1525 }
1526 None => false,
1527 }
1528 }
1529
1530 pub(crate) fn exceeds_batch_size<T>(&self, batch: &[T]) -> bool {
1531 match self.maximum_size {
1532 Some(maximum_size) => batch.len() > maximum_size,
1533 None => false,
1534 }
1535 }
1536}