1use std::collections::BTreeMap;
3use std::fmt;
4use std::hash::Hash;
5use std::io;
6use std::io::BufReader;
7use std::iter;
8use std::net::IpAddr;
9use std::net::SocketAddr;
10use std::num::NonZeroU32;
11use std::num::NonZeroUsize;
12use std::str::FromStr;
13use std::sync::Arc;
14use std::time::Duration;
15
16use connector::ConnectorConfiguration;
17use derivative::Derivative;
18use displaydoc::Display;
19use itertools::Either;
20use itertools::Itertools;
21use once_cell::sync::Lazy;
22pub(crate) use persisted_queries::PersistedQueries;
23pub(crate) use persisted_queries::PersistedQueriesPrewarmQueryPlanCache;
24#[cfg(test)]
25pub(crate) use persisted_queries::PersistedQueriesSafelist;
26use regex::Regex;
27use rustls::ServerConfig;
28use rustls::pki_types::CertificateDer;
29use rustls::pki_types::PrivateKeyDer;
30use schemars::JsonSchema;
31use schemars::Schema;
32use schemars::SchemaGenerator;
33use serde::Deserialize;
34use serde::Deserializer;
35use serde::Serialize;
36use serde_json::Map;
37use serde_json::Value;
38use sha2::Digest;
39use thiserror::Error;
40
41use self::cors::Cors;
42use self::expansion::Expansion;
43pub(crate) use self::experimental::Discussed;
44pub(crate) use self::schema::generate_config_schema;
45pub(crate) use self::schema::generate_upgrade;
46pub(crate) use self::schema::validate_yaml_configuration;
47use self::server::Server;
48use self::subgraph::SubgraphConfiguration;
49use crate::ApolloRouterError;
50use crate::cache::DEFAULT_CACHE_CAPACITY;
51use crate::configuration::cooperative_cancellation::CooperativeCancellation;
52use crate::graphql;
53use crate::plugin::plugins;
54use crate::plugins::chaos;
55use crate::plugins::chaos::Config;
56use crate::plugins::healthcheck::Config as HealthCheck;
57#[cfg(test)]
58use crate::plugins::healthcheck::test_listen;
59use crate::plugins::limits;
60use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
61use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN_NAME;
62use crate::plugins::subscription::SubscriptionConfig;
63use crate::plugins::subscription::notification::Notify;
64use crate::uplink::UplinkConfig;
65
66pub(crate) mod connector;
67pub(crate) mod cooperative_cancellation;
68pub(crate) mod cors;
69pub(crate) mod expansion;
70mod experimental;
71pub(crate) mod metrics;
72pub(crate) mod mode;
73mod persisted_queries;
74pub(crate) mod schema;
75pub(crate) mod server;
76pub(crate) mod shared;
77pub(crate) mod subgraph;
78#[cfg(test)]
79mod tests;
80mod upgrade;
81mod yaml;
82
83static HEARTBEAT_TIMEOUT_DURATION_SECONDS: u64 = 15;
85
86static SUPERGRAPH_ENDPOINT_REGEX: Lazy<Regex> = Lazy::new(|| {
87 Regex::new(r"(?P<first_path>.*/)(?P<sub_path>.+)\*$")
88 .expect("this regex to check the path is valid")
89});
90
91#[derive(Debug, Error, Display)]
93#[non_exhaustive]
94pub enum ConfigurationError {
95 CannotExpandVariable { key: String, cause: String },
97 UnknownExpansionMode {
99 key: String,
100 supported_modes: String,
101 },
102 PluginUnknown(String),
104 PluginConfiguration { plugin: String, error: String },
106 InvalidConfiguration {
108 message: &'static str,
109 error: String,
110 },
111 DeserializeConfigError(serde_json::Error),
113
114 InvalidExpansionModeConfig,
116
117 MigrationFailure { error: String },
119
120 CertificateAuthorities { error: String },
122}
123
124impl From<proteus::Error> for ConfigurationError {
125 fn from(error: proteus::Error) -> Self {
126 Self::MigrationFailure {
127 error: error.to_string(),
128 }
129 }
130}
131
132impl From<proteus::parser::Error> for ConfigurationError {
133 fn from(error: proteus::parser::Error) -> Self {
134 Self::MigrationFailure {
135 error: error.to_string(),
136 }
137 }
138}
139
140#[derive(Clone, Derivative, Serialize, JsonSchema)]
145#[derivative(Debug)]
146pub struct Configuration {
148 #[serde(skip)]
150 pub(crate) validated_yaml: Option<Value>,
151
152 #[serde(skip)]
154 pub(crate) raw_yaml: Option<Arc<str>>,
155
156 #[serde(default)]
158 pub(crate) health_check: HealthCheck,
159
160 #[serde(default)]
162 pub(crate) sandbox: Sandbox,
163
164 #[serde(default)]
166 pub(crate) homepage: Homepage,
167
168 #[serde(default)]
170 pub(crate) server: Server,
171
172 #[serde(default)]
174 pub(crate) supergraph: Supergraph,
175
176 #[serde(default)]
178 pub(crate) cors: Cors,
179
180 #[serde(default)]
181 pub(crate) tls: Tls,
182
183 #[serde(default)]
185 pub(crate) apq: Apq,
186
187 #[serde(default)]
189 pub persisted_queries: PersistedQueries,
190
191 #[serde(default)]
193 pub(crate) limits: limits::Config,
194
195 #[serde(default)]
198 pub(crate) experimental_chaos: Config,
199
200 #[serde(default)]
202 pub(crate) plugins: UserPlugins,
203
204 #[serde(default)]
206 #[serde(flatten)]
207 pub(crate) apollo_plugins: ApolloPlugins,
208
209 #[serde(skip)]
211 pub uplink: Option<UplinkConfig>,
212
213 #[serde(default, skip_serializing, skip_deserializing)]
216 pub(crate) notify: Notify<String, graphql::Response>,
217
218 #[serde(default)]
220 pub(crate) batching: Batching,
221
222 #[serde(default)]
224 pub(crate) experimental_type_conditioned_fetching: bool,
225}
226
227impl PartialEq for Configuration {
228 fn eq(&self, other: &Self) -> bool {
229 self.validated_yaml == other.validated_yaml
230 }
231}
232
233impl<'de> serde::Deserialize<'de> for Configuration {
234 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
235 where
236 D: serde::Deserializer<'de>,
237 {
238 #[derive(Deserialize, Default)]
241 #[serde(default)]
242 struct AdHocConfiguration {
243 health_check: HealthCheck,
244 sandbox: Sandbox,
245 homepage: Homepage,
246 server: Server,
247 supergraph: Supergraph,
248 cors: Cors,
249 plugins: UserPlugins,
250 #[serde(flatten)]
251 apollo_plugins: ApolloPlugins,
252 tls: Tls,
253 apq: Apq,
254 persisted_queries: PersistedQueries,
255 limits: limits::Config,
256 experimental_chaos: chaos::Config,
257 batching: Batching,
258 experimental_type_conditioned_fetching: bool,
259 }
260 let mut ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?;
261
262 let notify = Configuration::notify(&ad_hoc.apollo_plugins.plugins)
263 .map_err(|e| serde::de::Error::custom(e.to_string()))?;
264
265 ad_hoc.apollo_plugins.plugins.insert(
268 "limits".to_string(),
269 serde_json::to_value(&ad_hoc.limits).unwrap(),
270 );
271 ad_hoc.apollo_plugins.plugins.insert(
272 "health_check".to_string(),
273 serde_json::to_value(&ad_hoc.health_check).unwrap(),
274 );
275
276 Configuration {
278 health_check: ad_hoc.health_check,
279 sandbox: ad_hoc.sandbox,
280 homepage: ad_hoc.homepage,
281 server: ad_hoc.server,
282 supergraph: ad_hoc.supergraph,
283 cors: ad_hoc.cors,
284 tls: ad_hoc.tls,
285 apq: ad_hoc.apq,
286 persisted_queries: ad_hoc.persisted_queries,
287 limits: ad_hoc.limits,
288 experimental_chaos: ad_hoc.experimental_chaos,
289 experimental_type_conditioned_fetching: ad_hoc.experimental_type_conditioned_fetching,
290 plugins: ad_hoc.plugins,
291 apollo_plugins: ad_hoc.apollo_plugins,
292 batching: ad_hoc.batching,
293
294 notify,
296 uplink: None,
297 validated_yaml: None,
298 raw_yaml: None,
299 }
300 .validate()
301 .map_err(|e| serde::de::Error::custom(e.to_string()))
302 }
303}
304
305pub(crate) const APOLLO_PLUGIN_PREFIX: &str = "apollo.";
306
307fn default_graphql_listen() -> ListenAddr {
308 SocketAddr::from_str("127.0.0.1:4000").unwrap().into()
309}
310
311#[cfg(test)]
312#[buildstructor::buildstructor]
313impl Configuration {
314 #[builder]
315 pub(crate) fn new(
316 supergraph: Option<Supergraph>,
317 health_check: Option<HealthCheck>,
318 sandbox: Option<Sandbox>,
319 homepage: Option<Homepage>,
320 cors: Option<Cors>,
321 plugins: Map<String, Value>,
322 apollo_plugins: Map<String, Value>,
323 tls: Option<Tls>,
324 apq: Option<Apq>,
325 persisted_query: Option<PersistedQueries>,
326 operation_limits: Option<limits::Config>,
327 chaos: Option<chaos::Config>,
328 uplink: Option<UplinkConfig>,
329 experimental_type_conditioned_fetching: Option<bool>,
330 batching: Option<Batching>,
331 server: Option<Server>,
332 ) -> Result<Self, ConfigurationError> {
333 let notify = Self::notify(&apollo_plugins)?;
334
335 let conf = Self {
336 validated_yaml: Default::default(),
337 raw_yaml: None,
338 supergraph: supergraph.unwrap_or_default(),
339 server: server.unwrap_or_default(),
340 health_check: health_check.unwrap_or_default(),
341 sandbox: sandbox.unwrap_or_default(),
342 homepage: homepage.unwrap_or_default(),
343 cors: cors.unwrap_or_default(),
344 apq: apq.unwrap_or_default(),
345 persisted_queries: persisted_query.unwrap_or_default(),
346 limits: operation_limits.unwrap_or_default(),
347 experimental_chaos: chaos.unwrap_or_default(),
348 plugins: UserPlugins {
349 plugins: Some(plugins),
350 },
351 apollo_plugins: ApolloPlugins {
352 plugins: apollo_plugins,
353 },
354 tls: tls.unwrap_or_default(),
355 uplink,
356 batching: batching.unwrap_or_default(),
357 experimental_type_conditioned_fetching: experimental_type_conditioned_fetching
358 .unwrap_or_default(),
359 notify,
360 };
361
362 conf.validate()
363 }
364}
365
366impl Configuration {
367 pub(crate) fn hash(&self) -> String {
368 let mut hasher = sha2::Sha256::new();
369 let defaulted_raw = self
370 .validated_yaml
371 .as_ref()
372 .map(|s| serde_yaml::to_string(s).expect("config was not serializable"))
373 .unwrap_or_default();
374 hasher.update(defaulted_raw);
375 let hash: String = format!("{:x}", hasher.finalize());
376 hash
377 }
378
379 fn notify(
380 apollo_plugins: &Map<String, Value>,
381 ) -> Result<Notify<String, graphql::Response>, ConfigurationError> {
382 if cfg!(test) {
383 return Ok(Notify::for_tests());
384 }
385 let notify_queue_cap = match apollo_plugins.get(APOLLO_SUBSCRIPTION_PLUGIN_NAME) {
386 Some(plugin_conf) => {
387 let conf = serde_json::from_value::<SubscriptionConfig>(plugin_conf.clone())
388 .map_err(|err| ConfigurationError::PluginConfiguration {
389 plugin: APOLLO_SUBSCRIPTION_PLUGIN.to_string(),
390 error: format!("{err:?}"),
391 })?;
392 conf.queue_capacity
393 }
394 None => None,
395 };
396 Ok(Notify::builder()
397 .and_queue_size(notify_queue_cap)
398 .ttl(Duration::from_secs(HEARTBEAT_TIMEOUT_DURATION_SECONDS))
399 .heartbeat_error_message(
400 graphql::Response::builder()
401 .errors(vec![
402 graphql::Error::builder()
403 .message("the connection has been closed because it hasn't heartbeat for a while")
404 .extension_code("SUBSCRIPTION_HEARTBEAT_ERROR")
405 .build()
406 ])
407 .build()
408 ).build())
409 }
410
411 pub(crate) fn rust_query_planner_config(
412 &self,
413 ) -> apollo_federation::query_plan::query_planner::QueryPlannerConfig {
414 use apollo_federation::query_plan::query_planner::QueryPlanIncrementalDeliveryConfig;
415 use apollo_federation::query_plan::query_planner::QueryPlannerConfig;
416 use apollo_federation::query_plan::query_planner::QueryPlannerDebugConfig;
417
418 let max_evaluated_plans = self
419 .supergraph
420 .query_planning
421 .experimental_plans_limit
422 .and_then(NonZeroU32::new)
424 .unwrap_or(NonZeroU32::new(10_000).expect("it is not zero"));
425
426 QueryPlannerConfig {
427 subgraph_graphql_validation: false,
428 generate_query_fragments: self.supergraph.generate_query_fragments,
429 incremental_delivery: QueryPlanIncrementalDeliveryConfig {
430 enable_defer: self.supergraph.defer_support,
431 },
432 type_conditioned_fetching: self.experimental_type_conditioned_fetching,
433 debug: QueryPlannerDebugConfig {
434 max_evaluated_plans,
435 paths_limit: self.supergraph.query_planning.experimental_paths_limit,
436 },
437 }
438 }
439
440 fn apollo_plugin_enabled(&self, plugin_name: &str) -> bool {
441 self.apollo_plugins
442 .plugins
443 .get(plugin_name)
444 .and_then(|config| config.as_object().and_then(|c| c.get("enabled")))
445 .and_then(|enabled| enabled.as_bool())
446 .unwrap_or(false)
447 }
448}
449
450impl Default for Configuration {
451 fn default() -> Self {
452 Configuration::from_str("").expect("default configuration must be valid")
454 }
455}
456
457#[cfg(test)]
458#[buildstructor::buildstructor]
459impl Configuration {
460 #[builder]
461 pub(crate) fn fake_new(
462 supergraph: Option<Supergraph>,
463 health_check: Option<HealthCheck>,
464 sandbox: Option<Sandbox>,
465 homepage: Option<Homepage>,
466 cors: Option<Cors>,
467 plugins: Map<String, Value>,
468 apollo_plugins: Map<String, Value>,
469 tls: Option<Tls>,
470 notify: Option<Notify<String, graphql::Response>>,
471 apq: Option<Apq>,
472 persisted_query: Option<PersistedQueries>,
473 operation_limits: Option<limits::Config>,
474 chaos: Option<chaos::Config>,
475 uplink: Option<UplinkConfig>,
476 batching: Option<Batching>,
477 experimental_type_conditioned_fetching: Option<bool>,
478 server: Option<Server>,
479 ) -> Result<Self, ConfigurationError> {
480 let configuration = Self {
481 validated_yaml: Default::default(),
482 server: server.unwrap_or_default(),
483 supergraph: supergraph.unwrap_or_else(|| Supergraph::fake_builder().build()),
484 health_check: health_check.unwrap_or_else(|| HealthCheck::builder().build()),
485 sandbox: sandbox.unwrap_or_else(|| Sandbox::fake_builder().build()),
486 homepage: homepage.unwrap_or_else(|| Homepage::fake_builder().build()),
487 cors: cors.unwrap_or_default(),
488 limits: operation_limits.unwrap_or_default(),
489 experimental_chaos: chaos.unwrap_or_default(),
490 plugins: UserPlugins {
491 plugins: Some(plugins),
492 },
493 apollo_plugins: ApolloPlugins {
494 plugins: apollo_plugins,
495 },
496 tls: tls.unwrap_or_default(),
497 notify: notify.unwrap_or_default(),
498 apq: apq.unwrap_or_default(),
499 persisted_queries: persisted_query.unwrap_or_default(),
500 uplink,
501 experimental_type_conditioned_fetching: experimental_type_conditioned_fetching
502 .unwrap_or_default(),
503 batching: batching.unwrap_or_default(),
504 raw_yaml: None,
505 };
506
507 configuration.validate()
508 }
509}
510
511impl Configuration {
512 pub(crate) fn validate(self) -> Result<Self, ConfigurationError> {
513 if self.sandbox.enabled && self.homepage.enabled {
515 return Err(ConfigurationError::InvalidConfiguration {
516 message: "sandbox and homepage cannot be enabled at the same time",
517 error: "disable the homepage if you want to enable sandbox".to_string(),
518 });
519 }
520 if self.sandbox.enabled && !self.supergraph.introspection {
522 return Err(ConfigurationError::InvalidConfiguration {
523 message: "sandbox requires introspection",
524 error: "sandbox needs introspection to be enabled".to_string(),
525 });
526 }
527 if !self.supergraph.path.starts_with('/') {
528 return Err(ConfigurationError::InvalidConfiguration {
529 message: "invalid 'server.graphql_path' configuration",
530 error: format!(
531 "'{}' is invalid, it must be an absolute path and start with '/', you should try with '/{}'",
532 self.supergraph.path, self.supergraph.path
533 ),
534 });
535 }
536 if self.supergraph.path.ends_with('*')
537 && !self.supergraph.path.ends_with("/*")
538 && !SUPERGRAPH_ENDPOINT_REGEX.is_match(&self.supergraph.path)
539 {
540 return Err(ConfigurationError::InvalidConfiguration {
541 message: "invalid 'server.graphql_path' configuration",
542 error: format!(
543 "'{}' is invalid, you can only set a wildcard after a '/'",
544 self.supergraph.path
545 ),
546 });
547 }
548 if self.supergraph.path.contains("/*/") {
549 return Err(ConfigurationError::InvalidConfiguration {
550 message: "invalid 'server.graphql_path' configuration",
551 error: format!(
552 "'{}' is invalid, if you need to set a path like '/*/graphql' then specify it as a path parameter with a name, for example '/:my_project_key/graphql'",
553 self.supergraph.path
554 ),
555 });
556 }
557
558 if self.persisted_queries.enabled {
560 if self.persisted_queries.safelist.enabled && self.apq.enabled {
561 return Err(ConfigurationError::InvalidConfiguration {
562 message: "apqs must be disabled to enable safelisting",
563 error: "either set persisted_queries.safelist.enabled: false or apq.enabled: false in your router yaml configuration".into()
564 });
565 } else if !self.persisted_queries.safelist.enabled
566 && self.persisted_queries.safelist.require_id
567 {
568 return Err(ConfigurationError::InvalidConfiguration {
569 message: "safelist must be enabled to require IDs",
570 error: "either set persisted_queries.safelist.enabled: true or persisted_queries.safelist.require_id: false in your router yaml configuration".into()
571 });
572 }
573 } else {
574 if self.persisted_queries.safelist.enabled {
576 return Err(ConfigurationError::InvalidConfiguration {
577 message: "persisted queries must be enabled to enable safelisting",
578 error: "either set persisted_queries.safelist.enabled: false or persisted_queries.enabled: true in your router yaml configuration".into()
579 });
580 } else if self.persisted_queries.log_unknown {
581 return Err(ConfigurationError::InvalidConfiguration {
582 message: "persisted queries must be enabled to enable logging unknown operations",
583 error: "either set persisted_queries.log_unknown: false or persisted_queries.enabled: true in your router yaml configuration".into()
584 });
585 }
586 }
587
588 if self.apollo_plugin_enabled("preview_response_cache")
590 && self.apollo_plugin_enabled("preview_entity_cache")
591 {
592 return Err(ConfigurationError::InvalidConfiguration {
593 message: "entity cache and response cache features are mutually exclusive",
594 error: "either set preview_response_cache.enabled: false or preview_entity_cache.enabled: false in your router yaml configuration".into(),
595 });
596 }
597
598 Ok(self)
599 }
600}
601
602impl FromStr for Configuration {
604 type Err = ConfigurationError;
605
606 fn from_str(s: &str) -> Result<Self, Self::Err> {
607 schema::validate_yaml_configuration(s, Expansion::default()?, schema::Mode::Upgrade)?
608 .validate()
609 }
610}
611
612fn gen_schema(
613 plugins: BTreeMap<String, Schema>,
614 hidden_plugins: Option<BTreeMap<String, Schema>>,
615) -> Schema {
616 schemars::json_schema!({
617 "type": "object",
618 "properties": plugins,
619 "additionalProperties": false,
620 "patternProperties": hidden_plugins
621 .unwrap_or_default()
622 .into_iter()
623 .map(|(k, v)| (format!("^{}$", k), v))
625 .collect::<BTreeMap<_, _>>()
626 })
627}
628
629#[derive(Clone, Debug, Default, Deserialize, Serialize)]
635#[serde(transparent)]
636pub(crate) struct ApolloPlugins {
637 pub(crate) plugins: Map<String, Value>,
638}
639
640impl JsonSchema for ApolloPlugins {
641 fn schema_name() -> std::borrow::Cow<'static, str> {
642 stringify!(Plugins).into()
643 }
644
645 fn json_schema(generator: &mut SchemaGenerator) -> Schema {
646 let (plugin_entries, hidden_plugin_entries): (Vec<_>, Vec<_>) = crate::plugin::plugins()
650 .sorted_by_key(|factory| factory.name.clone())
651 .filter(|factory| factory.name.starts_with(APOLLO_PLUGIN_PREFIX))
652 .partition_map(|factory| {
653 let key = factory.name[APOLLO_PLUGIN_PREFIX.len()..].to_string();
654 let schema = factory.create_schema(generator);
655 if factory.hidden_from_config_json_schema {
657 Either::Right((key, schema))
658 } else {
659 Either::Left((key, schema))
660 }
661 });
662 gen_schema(
663 plugin_entries.into_iter().collect(),
664 Some(hidden_plugin_entries.into_iter().collect()),
665 )
666 }
667}
668
669#[derive(Clone, Debug, Default, Deserialize, Serialize)]
674#[serde(transparent)]
675pub(crate) struct UserPlugins {
676 pub(crate) plugins: Option<Map<String, Value>>,
677}
678
679impl JsonSchema for UserPlugins {
680 fn schema_name() -> std::borrow::Cow<'static, str> {
681 stringify!(Plugins).into()
682 }
683
684 fn json_schema(generator: &mut SchemaGenerator) -> Schema {
685 let plugins = crate::plugin::plugins()
689 .sorted_by_key(|factory| factory.name.clone())
690 .filter(|factory| !factory.name.starts_with(APOLLO_PLUGIN_PREFIX))
691 .map(|factory| (factory.name.to_string(), factory.create_schema(generator)))
692 .collect();
693 gen_schema(plugins, None)
694 }
695}
696
697#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
699#[serde(deny_unknown_fields)]
700#[serde(default)]
701pub(crate) struct Supergraph {
702 pub(crate) listen: ListenAddr,
705
706 #[serde(deserialize_with = "humantime_serde::deserialize")]
708 #[schemars(with = "String", default = "default_connection_shutdown_timeout")]
709 pub(crate) connection_shutdown_timeout: Duration,
710
711 pub(crate) path: String,
714
715 pub(crate) introspection: bool,
718
719 pub(crate) generate_query_fragments: bool,
722
723 pub(crate) defer_support: bool,
725
726 pub(crate) query_planning: QueryPlanning,
728
729 pub(crate) early_cancel: bool,
734
735 pub(crate) enable_result_coercion_errors: bool,
742
743 pub(crate) experimental_log_on_broken_pipe: bool,
746}
747
748const fn default_generate_query_fragments() -> bool {
749 true
750}
751
752fn default_defer_support() -> bool {
753 true
754}
755
756#[buildstructor::buildstructor]
757impl Supergraph {
758 #[builder]
759 pub(crate) fn new(
760 listen: Option<ListenAddr>,
761 path: Option<String>,
762 connection_shutdown_timeout: Option<Duration>,
763 introspection: Option<bool>,
764 defer_support: Option<bool>,
765 query_planning: Option<QueryPlanning>,
766 generate_query_fragments: Option<bool>,
767 early_cancel: Option<bool>,
768 experimental_log_on_broken_pipe: Option<bool>,
769 insert_result_coercion_errors: Option<bool>,
770 ) -> Self {
771 Self {
772 listen: listen.unwrap_or_else(default_graphql_listen),
773 path: path.unwrap_or_else(default_graphql_path),
774 connection_shutdown_timeout: connection_shutdown_timeout
775 .unwrap_or_else(default_connection_shutdown_timeout),
776 introspection: introspection.unwrap_or_else(default_graphql_introspection),
777 defer_support: defer_support.unwrap_or_else(default_defer_support),
778 query_planning: query_planning.unwrap_or_default(),
779 generate_query_fragments: generate_query_fragments
780 .unwrap_or_else(default_generate_query_fragments),
781 early_cancel: early_cancel.unwrap_or_default(),
782 experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
783 enable_result_coercion_errors: insert_result_coercion_errors.unwrap_or_default(),
784 }
785 }
786}
787
788#[cfg(test)]
789#[buildstructor::buildstructor]
790impl Supergraph {
791 #[builder]
792 pub(crate) fn fake_new(
793 listen: Option<ListenAddr>,
794 path: Option<String>,
795 connection_shutdown_timeout: Option<Duration>,
796 introspection: Option<bool>,
797 defer_support: Option<bool>,
798 query_planning: Option<QueryPlanning>,
799 generate_query_fragments: Option<bool>,
800 early_cancel: Option<bool>,
801 experimental_log_on_broken_pipe: Option<bool>,
802 insert_result_coercion_errors: Option<bool>,
803 ) -> Self {
804 Self {
805 listen: listen.unwrap_or_else(test_listen),
806 path: path.unwrap_or_else(default_graphql_path),
807 connection_shutdown_timeout: connection_shutdown_timeout
808 .unwrap_or_else(default_connection_shutdown_timeout),
809 introspection: introspection.unwrap_or_else(default_graphql_introspection),
810 defer_support: defer_support.unwrap_or_else(default_defer_support),
811 query_planning: query_planning.unwrap_or_default(),
812 generate_query_fragments: generate_query_fragments
813 .unwrap_or_else(default_generate_query_fragments),
814 early_cancel: early_cancel.unwrap_or_default(),
815 experimental_log_on_broken_pipe: experimental_log_on_broken_pipe.unwrap_or_default(),
816 enable_result_coercion_errors: insert_result_coercion_errors.unwrap_or_default(),
817 }
818 }
819}
820
821impl Default for Supergraph {
822 fn default() -> Self {
823 Self::builder().build()
824 }
825}
826
827impl Supergraph {
828 pub(crate) fn sanitized_path(&self) -> String {
830 let mut path = self.path.clone();
831 if self.path.ends_with("/*") {
832 path = format!("{}router_extra_path", self.path);
834 } else if SUPERGRAPH_ENDPOINT_REGEX.is_match(&self.path) {
835 let new_path = SUPERGRAPH_ENDPOINT_REGEX
836 .replace(&self.path, "${first_path}${sub_path}{supergraph_route}");
837 path = new_path.to_string();
838 }
839
840 path
841 }
842}
843
844#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)]
846#[serde(deny_unknown_fields)]
847pub(crate) struct Router {
848 #[serde(default)]
849 pub(crate) cache: Cache,
850}
851
852#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
854#[serde(deny_unknown_fields, default)]
855pub(crate) struct Apq {
856 pub(crate) enabled: bool,
858
859 pub(crate) router: Router,
860
861 pub(crate) subgraph: SubgraphConfiguration<SubgraphApq>,
862}
863
864#[cfg(test)]
865#[buildstructor::buildstructor]
866impl Apq {
867 #[builder]
868 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
869 Self {
870 enabled: enabled.unwrap_or_else(default_apq),
871 ..Default::default()
872 }
873 }
874}
875
876#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
878#[serde(deny_unknown_fields, default)]
879pub(crate) struct SubgraphApq {
880 pub(crate) enabled: bool,
882}
883
884fn default_apq() -> bool {
885 true
886}
887
888impl Default for Apq {
889 fn default() -> Self {
890 Self {
891 enabled: default_apq(),
892 router: Default::default(),
893 subgraph: Default::default(),
894 }
895 }
896}
897
898#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)]
900#[serde(deny_unknown_fields, default)]
901pub(crate) struct QueryPlanning {
902 pub(crate) cache: QueryPlanCache,
904 pub(crate) warmed_up_queries: Option<usize>,
909
910 pub(crate) experimental_plans_limit: Option<u32>,
918
919 pub(crate) experimental_paths_limit: Option<u32>,
932
933 pub(crate) experimental_reuse_query_plans: bool,
936
937 pub(crate) experimental_cooperative_cancellation: CooperativeCancellation,
941}
942
943#[buildstructor::buildstructor]
944impl QueryPlanning {
945 #[builder]
946 #[allow(dead_code)]
947 pub(crate) fn new(
948 cache: Option<QueryPlanCache>,
949 warmed_up_queries: Option<usize>,
950 experimental_plans_limit: Option<u32>,
951 experimental_paths_limit: Option<u32>,
952 experimental_reuse_query_plans: Option<bool>,
953 experimental_cooperative_cancellation: Option<CooperativeCancellation>,
954 ) -> Self {
955 Self {
956 cache: cache.unwrap_or_default(),
957 warmed_up_queries,
958 experimental_plans_limit,
959 experimental_paths_limit,
960 experimental_reuse_query_plans: experimental_reuse_query_plans.unwrap_or_default(),
961 experimental_cooperative_cancellation: experimental_cooperative_cancellation
962 .unwrap_or_default(),
963 }
964 }
965}
966
967#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
969#[serde(deny_unknown_fields, default)]
970pub(crate) struct QueryPlanCache {
971 pub(crate) in_memory: InMemoryCache,
973 pub(crate) redis: Option<QueryPlanRedisCache>,
975}
976
977#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
978#[serde(deny_unknown_fields)]
979pub(crate) struct QueryPlanRedisCache {
981 pub(crate) urls: Vec<url::Url>,
983
984 pub(crate) username: Option<String>,
986 pub(crate) password: Option<String>,
988
989 #[serde(
990 deserialize_with = "humantime_serde::deserialize",
991 default = "default_timeout"
992 )]
993 #[schemars(with = "Option<String>", default)]
994 pub(crate) timeout: Duration,
996
997 #[serde(
998 deserialize_with = "humantime_serde::deserialize",
999 default = "default_query_plan_cache_ttl"
1000 )]
1001 #[schemars(with = "Option<String>", default = "default_query_plan_cache_ttl")]
1002 pub(crate) ttl: Duration,
1004
1005 pub(crate) namespace: Option<String>,
1007
1008 #[serde(default)]
1009 pub(crate) tls: Option<TlsClient>,
1011
1012 #[serde(default = "default_required_to_start")]
1013 pub(crate) required_to_start: bool,
1015
1016 #[serde(default = "default_reset_ttl")]
1017 pub(crate) reset_ttl: bool,
1019
1020 #[serde(default = "default_query_planner_cache_pool_size")]
1021 pub(crate) pool_size: u32,
1023}
1024
1025fn default_query_plan_cache_ttl() -> Duration {
1026 Duration::from_secs(86400 * 30)
1028}
1029
1030fn default_query_planner_cache_pool_size() -> u32 {
1031 1
1032}
1033
1034#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1036#[serde(deny_unknown_fields, default)]
1037pub(crate) struct Cache {
1038 pub(crate) in_memory: InMemoryCache,
1040 pub(crate) redis: Option<RedisCache>,
1042}
1043
1044impl From<QueryPlanCache> for Cache {
1045 fn from(value: QueryPlanCache) -> Self {
1046 Cache {
1047 in_memory: value.in_memory,
1048 redis: value.redis.map(Into::into),
1049 }
1050 }
1051}
1052
1053#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1054#[serde(deny_unknown_fields)]
1055pub(crate) struct InMemoryCache {
1057 pub(crate) limit: NonZeroUsize,
1059}
1060
1061impl Default for InMemoryCache {
1062 fn default() -> Self {
1063 Self {
1064 limit: DEFAULT_CACHE_CAPACITY,
1065 }
1066 }
1067}
1068
1069#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1070#[serde(deny_unknown_fields)]
1071pub(crate) struct RedisCache {
1073 pub(crate) urls: Vec<url::Url>,
1075
1076 pub(crate) username: Option<String>,
1078 pub(crate) password: Option<String>,
1080
1081 #[serde(
1082 deserialize_with = "humantime_serde::deserialize",
1083 default = "default_timeout"
1084 )]
1085 #[schemars(with = "Option<String>", default)]
1086 pub(crate) timeout: Duration,
1088
1089 #[serde(deserialize_with = "humantime_serde::deserialize", default)]
1090 #[schemars(with = "Option<String>", default)]
1091 pub(crate) ttl: Option<Duration>,
1093
1094 pub(crate) namespace: Option<String>,
1096
1097 #[serde(default)]
1098 pub(crate) tls: Option<TlsClient>,
1100
1101 #[serde(default = "default_required_to_start")]
1102 pub(crate) required_to_start: bool,
1104
1105 #[serde(default = "default_reset_ttl")]
1106 pub(crate) reset_ttl: bool,
1108
1109 #[serde(default = "default_pool_size")]
1110 pub(crate) pool_size: u32,
1112 #[serde(
1113 deserialize_with = "humantime_serde::deserialize",
1114 default = "default_metrics_interval"
1115 )]
1116 #[schemars(with = "Option<String>", default)]
1117 pub(crate) metrics_interval: Duration,
1119}
1120
1121fn default_timeout() -> Duration {
1122 Duration::from_millis(500)
1123}
1124
1125pub(crate) fn default_required_to_start() -> bool {
1126 false
1127}
1128
1129pub(crate) fn default_pool_size() -> u32 {
1130 1
1131}
1132
1133pub(crate) fn default_metrics_interval() -> Duration {
1134 Duration::from_secs(1)
1135}
1136
1137impl From<QueryPlanRedisCache> for RedisCache {
1138 fn from(value: QueryPlanRedisCache) -> Self {
1139 RedisCache {
1140 urls: value.urls,
1141 username: value.username,
1142 password: value.password,
1143 timeout: value.timeout,
1144 ttl: Some(value.ttl),
1145 namespace: value.namespace,
1146 tls: value.tls,
1147 required_to_start: value.required_to_start,
1148 reset_ttl: value.reset_ttl,
1149 pool_size: value.pool_size,
1150 metrics_interval: default_metrics_interval(),
1151 }
1152 }
1153}
1154
1155fn default_reset_ttl() -> bool {
1156 true
1157}
1158
1159#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1161#[serde(deny_unknown_fields)]
1162#[serde(default)]
1163pub(crate) struct Tls {
1164 pub(crate) supergraph: Option<Arc<TlsSupergraph>>,
1168 pub(crate) subgraph: SubgraphConfiguration<TlsClient>,
1170 pub(crate) connector: ConnectorConfiguration<TlsClient>,
1172}
1173
1174#[derive(Debug, Deserialize, Serialize, JsonSchema)]
1176#[serde(deny_unknown_fields)]
1177pub(crate) struct TlsSupergraph {
1178 #[serde(deserialize_with = "deserialize_certificate", skip_serializing)]
1180 #[schemars(with = "String")]
1181 pub(crate) certificate: CertificateDer<'static>,
1182 #[serde(deserialize_with = "deserialize_key", skip_serializing)]
1184 #[schemars(with = "String")]
1185 pub(crate) key: PrivateKeyDer<'static>,
1186 #[serde(deserialize_with = "deserialize_certificate_chain", skip_serializing)]
1188 #[schemars(with = "String")]
1189 pub(crate) certificate_chain: Vec<CertificateDer<'static>>,
1190}
1191
1192impl TlsSupergraph {
1193 pub(crate) fn tls_config(&self) -> Result<Arc<rustls::ServerConfig>, ApolloRouterError> {
1194 let mut certificates = vec![self.certificate.clone()];
1195 certificates.extend(self.certificate_chain.iter().cloned());
1196
1197 let mut config = ServerConfig::builder()
1198 .with_no_client_auth()
1199 .with_single_cert(certificates, self.key.clone_key())
1200 .map_err(ApolloRouterError::Rustls)?;
1201 config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
1202
1203 Ok(Arc::new(config))
1204 }
1205}
1206
1207fn deserialize_certificate<'de, D>(deserializer: D) -> Result<CertificateDer<'static>, D::Error>
1208where
1209 D: Deserializer<'de>,
1210{
1211 let data = String::deserialize(deserializer)?;
1212
1213 load_certs(&data)
1214 .map_err(serde::de::Error::custom)
1215 .and_then(|mut certs| {
1216 if certs.len() > 1 {
1217 Err(serde::de::Error::custom("expected exactly one certificate"))
1218 } else {
1219 certs
1220 .pop()
1221 .ok_or(serde::de::Error::custom("expected exactly one certificate"))
1222 }
1223 })
1224}
1225
1226fn deserialize_certificate_chain<'de, D>(
1227 deserializer: D,
1228) -> Result<Vec<CertificateDer<'static>>, D::Error>
1229where
1230 D: Deserializer<'de>,
1231{
1232 let data = String::deserialize(deserializer)?;
1233
1234 load_certs(&data).map_err(serde::de::Error::custom)
1235}
1236
1237fn deserialize_key<'de, D>(deserializer: D) -> Result<PrivateKeyDer<'static>, D::Error>
1238where
1239 D: Deserializer<'de>,
1240{
1241 let data = String::deserialize(deserializer)?;
1242
1243 load_key(&data).map_err(serde::de::Error::custom)
1244}
1245
1246#[derive(thiserror::Error, Debug)]
1247#[error("could not load TLS certificate: {0}")]
1248struct LoadCertError(std::io::Error);
1249
1250pub(crate) fn load_certs(data: &str) -> io::Result<Vec<CertificateDer<'static>>> {
1251 rustls_pemfile::certs(&mut BufReader::new(data.as_bytes()))
1252 .collect::<Result<Vec<_>, _>>()
1253 .map_err(|error| io::Error::new(io::ErrorKind::InvalidInput, LoadCertError(error)))
1254}
1255
1256pub(crate) fn load_key(data: &str) -> io::Result<PrivateKeyDer<'static>> {
1257 let mut reader = BufReader::new(data.as_bytes());
1258 let mut key_iterator = iter::from_fn(|| rustls_pemfile::read_one(&mut reader).transpose());
1259
1260 let private_key = match key_iterator.next() {
1261 Some(Ok(rustls_pemfile::Item::Pkcs1Key(key))) => PrivateKeyDer::from(key),
1262 Some(Ok(rustls_pemfile::Item::Pkcs8Key(key))) => PrivateKeyDer::from(key),
1263 Some(Ok(rustls_pemfile::Item::Sec1Key(key))) => PrivateKeyDer::from(key),
1264 Some(Err(e)) => {
1265 return Err(io::Error::new(
1266 io::ErrorKind::InvalidInput,
1267 format!("could not parse the key: {e}"),
1268 ));
1269 }
1270 Some(_) => {
1271 return Err(io::Error::new(
1272 io::ErrorKind::InvalidInput,
1273 "expected a private key",
1274 ));
1275 }
1276 None => {
1277 return Err(io::Error::new(
1278 io::ErrorKind::InvalidInput,
1279 "could not find a private key",
1280 ));
1281 }
1282 };
1283
1284 if key_iterator.next().is_some() {
1285 return Err(io::Error::new(
1286 io::ErrorKind::InvalidInput,
1287 "expected exactly one private key",
1288 ));
1289 }
1290 Ok(private_key)
1291}
1292
1293#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1295#[serde(deny_unknown_fields)]
1296#[serde(default)]
1297pub(crate) struct TlsClient {
1298 pub(crate) certificate_authorities: Option<String>,
1300 pub(crate) client_authentication: Option<Arc<TlsClientAuth>>,
1302}
1303
1304#[buildstructor::buildstructor]
1305impl TlsClient {
1306 #[builder]
1307 pub(crate) fn new(
1308 certificate_authorities: Option<String>,
1309 client_authentication: Option<Arc<TlsClientAuth>>,
1310 ) -> Self {
1311 Self {
1312 certificate_authorities,
1313 client_authentication,
1314 }
1315 }
1316}
1317
1318impl Default for TlsClient {
1319 fn default() -> Self {
1320 Self::builder().build()
1321 }
1322}
1323
1324#[derive(Debug, Deserialize, Serialize, JsonSchema)]
1326#[serde(deny_unknown_fields)]
1327pub(crate) struct TlsClientAuth {
1328 #[serde(deserialize_with = "deserialize_certificate_chain", skip_serializing)]
1330 #[schemars(with = "String")]
1331 pub(crate) certificate_chain: Vec<CertificateDer<'static>>,
1332 #[serde(deserialize_with = "deserialize_key", skip_serializing)]
1334 #[schemars(with = "String")]
1335 pub(crate) key: PrivateKeyDer<'static>,
1336}
1337
1338#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1340#[serde(deny_unknown_fields)]
1341#[serde(default)]
1342pub(crate) struct Sandbox {
1343 pub(crate) enabled: bool,
1345}
1346
1347fn default_sandbox() -> bool {
1348 false
1349}
1350
1351#[buildstructor::buildstructor]
1352impl Sandbox {
1353 #[builder]
1354 pub(crate) fn new(enabled: Option<bool>) -> Self {
1355 Self {
1356 enabled: enabled.unwrap_or_else(default_sandbox),
1357 }
1358 }
1359}
1360
1361#[cfg(test)]
1362#[buildstructor::buildstructor]
1363impl Sandbox {
1364 #[builder]
1365 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
1366 Self {
1367 enabled: enabled.unwrap_or_else(default_sandbox),
1368 }
1369 }
1370}
1371
1372impl Default for Sandbox {
1373 fn default() -> Self {
1374 Self::builder().build()
1375 }
1376}
1377
1378#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1380#[serde(deny_unknown_fields)]
1381#[serde(default)]
1382pub(crate) struct Homepage {
1383 pub(crate) enabled: bool,
1385 pub(crate) graph_ref: Option<String>,
1388}
1389
1390fn default_homepage() -> bool {
1391 true
1392}
1393
1394#[buildstructor::buildstructor]
1395impl Homepage {
1396 #[builder]
1397 pub(crate) fn new(enabled: Option<bool>) -> Self {
1398 Self {
1399 enabled: enabled.unwrap_or_else(default_homepage),
1400 graph_ref: None,
1401 }
1402 }
1403}
1404
1405#[cfg(test)]
1406#[buildstructor::buildstructor]
1407impl Homepage {
1408 #[builder]
1409 pub(crate) fn fake_new(enabled: Option<bool>) -> Self {
1410 Self {
1411 enabled: enabled.unwrap_or_else(default_homepage),
1412 graph_ref: None,
1413 }
1414 }
1415}
1416
1417impl Default for Homepage {
1418 fn default() -> Self {
1419 Self::builder().enabled(default_homepage()).build()
1420 }
1421}
1422
1423#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema)]
1425#[serde(untagged)]
1426pub enum ListenAddr {
1427 SocketAddr(SocketAddr),
1429 #[cfg(unix)]
1431 UnixSocket(std::path::PathBuf),
1432}
1433
1434impl ListenAddr {
1435 pub(crate) fn ip_and_port(&self) -> Option<(IpAddr, u16)> {
1436 #[cfg_attr(not(unix), allow(irrefutable_let_patterns))]
1437 if let Self::SocketAddr(addr) = self {
1438 Some((addr.ip(), addr.port()))
1439 } else {
1440 None
1441 }
1442 }
1443}
1444
1445impl From<SocketAddr> for ListenAddr {
1446 fn from(addr: SocketAddr) -> Self {
1447 Self::SocketAddr(addr)
1448 }
1449}
1450
1451#[allow(clippy::from_over_into)]
1452impl Into<serde_json::Value> for ListenAddr {
1453 fn into(self) -> serde_json::Value {
1454 match self {
1455 Self::SocketAddr(addr) => serde_json::Value::String(addr.to_string()),
1458 #[cfg(unix)]
1459 Self::UnixSocket(path) => serde_json::Value::String(
1460 path.as_os_str()
1461 .to_str()
1462 .expect("unsupported non-UTF-8 path")
1463 .to_string(),
1464 ),
1465 }
1466 }
1467}
1468
1469#[cfg(unix)]
1470impl From<tokio_util::either::Either<std::net::SocketAddr, tokio::net::unix::SocketAddr>>
1471 for ListenAddr
1472{
1473 fn from(
1474 addr: tokio_util::either::Either<std::net::SocketAddr, tokio::net::unix::SocketAddr>,
1475 ) -> Self {
1476 match addr {
1477 tokio_util::either::Either::Left(addr) => Self::SocketAddr(addr),
1478 tokio_util::either::Either::Right(addr) => Self::UnixSocket(
1479 addr.as_pathname()
1480 .map(ToOwned::to_owned)
1481 .unwrap_or_default(),
1482 ),
1483 }
1484 }
1485}
1486
1487impl fmt::Display for ListenAddr {
1488 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1489 match self {
1490 Self::SocketAddr(addr) => write!(f, "http://{addr}"),
1491 #[cfg(unix)]
1492 Self::UnixSocket(path) => write!(f, "{}", path.display()),
1493 }
1494 }
1495}
1496
1497fn default_graphql_path() -> String {
1498 String::from("/")
1499}
1500
1501fn default_graphql_introspection() -> bool {
1502 false
1503}
1504
1505fn default_connection_shutdown_timeout() -> Duration {
1506 Duration::from_secs(60)
1507}
1508
1509#[derive(Clone, Debug, Default, Error, Display, Serialize, Deserialize, JsonSchema)]
1510#[serde(deny_unknown_fields, rename_all = "snake_case")]
1511pub(crate) enum BatchingMode {
1512 #[default]
1514 BatchHttpLink,
1515}
1516
1517#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1519#[serde(deny_unknown_fields)]
1520pub(crate) struct Batching {
1521 #[serde(default)]
1523 pub(crate) enabled: bool,
1524
1525 pub(crate) mode: BatchingMode,
1527
1528 pub(crate) subgraph: Option<SubgraphConfiguration<CommonBatchingConfig>>,
1530
1531 #[serde(default)]
1533 pub(crate) maximum_size: Option<usize>,
1534}
1535
1536#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1538pub(crate) struct CommonBatchingConfig {
1539 pub(crate) enabled: bool,
1541}
1542
1543impl Batching {
1544 pub(crate) fn batch_include(&self, service_name: &str) -> bool {
1546 match &self.subgraph {
1547 Some(subgraph_batching_config) => {
1548 if subgraph_batching_config.all.enabled {
1550 subgraph_batching_config
1554 .subgraphs
1555 .get(service_name)
1556 .is_none_or(|x| x.enabled)
1557 } else {
1558 subgraph_batching_config
1561 .subgraphs
1562 .get(service_name)
1563 .is_some_and(|x| x.enabled)
1564 }
1565 }
1566 None => false,
1567 }
1568 }
1569
1570 pub(crate) fn exceeds_batch_size<T>(&self, batch: &[T]) -> bool {
1571 match self.maximum_size {
1572 Some(maximum_size) => batch.len() > maximum_size,
1573 None => false,
1574 }
1575 }
1576}