Skip to main content

apollo_router/
test_harness.rs

1//! Test harness and mocks for the Apollo Router.
2
3use std::collections::HashMap;
4use std::default::Default;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use tower::BoxError;
9use tower::ServiceBuilder;
10use tower::ServiceExt;
11use tower_http::trace::MakeSpan;
12use tracing_futures::Instrument;
13
14use crate::axum_factory::span_mode;
15use crate::axum_factory::utils::PropagatingMakeSpan;
16use crate::configuration::Configuration;
17use crate::configuration::ConfigurationError;
18use crate::graphql;
19use crate::plugin::DynPlugin;
20use crate::plugin::Plugin;
21use crate::plugin::PluginInit;
22use crate::plugin::PluginPrivate;
23use crate::plugin::PluginUnstable;
24use crate::plugin::test::MockSubgraph;
25use crate::plugin::test::canned;
26use crate::plugins::telemetry::reload::init_telemetry;
27use crate::router_factory::YamlRouterFactory;
28use crate::services::HasSchema;
29use crate::services::SupergraphCreator;
30use crate::services::execution;
31use crate::services::layers::persisted_queries::PersistedQueryLayer;
32use crate::services::layers::query_analysis::QueryAnalysisLayer;
33use crate::services::router;
34use crate::services::router::service::RouterCreator;
35use crate::services::subgraph;
36use crate::services::supergraph;
37use crate::spec::Schema;
38use crate::uplink::license_enforcement::LicenseState;
39
40/// Mocks for services the Apollo Router must integrate with.
41pub mod mocks;
42
43#[cfg(test)]
44pub(crate) mod http_client;
45
46/// Builder for the part of an Apollo Router that handles GraphQL requests, as a [`tower::Service`].
47///
48/// This allows tests, benchmarks, etc
49/// to manipulate request and response objects in memory
50/// without going over the network on the supergraph side.
51///
52/// On the subgraph side, this test harness never makes network requests to subgraphs
53/// unless [`with_subgraph_network_requests`][Self::with_subgraph_network_requests] is called.
54///
55/// Compared to running a full [`RouterHttpServer`][crate::RouterHttpServer],
56/// this test harness is lacking:
57///
58/// * Custom endpoints from plugins
59/// * The health check endpoint
60/// * CORS (FIXME: should this include CORS?)
61/// * HTTP compression
62///
63/// Example making a single request:
64///
65/// ```
66/// use apollo_router::services::supergraph;
67/// use apollo_router::TestHarness;
68/// use tower::util::ServiceExt;
69///
70/// # #[tokio::main] async fn main() -> Result<(), tower::BoxError> {
71/// let config = serde_json::json!({"supergraph": { "introspection": false }});
72/// let request = supergraph::Request::fake_builder()
73///     // Request building here
74///     .build()
75///     .unwrap();
76/// let response = TestHarness::builder()
77///     .configuration_json(config)?
78///     .build_router()
79///     .await?
80///     .oneshot(request.try_into().unwrap())
81///     .await?
82///     .next_response()
83///     .await
84///     .unwrap();
85/// # Ok(()) }
86/// ```
87pub struct TestHarness<'a> {
88    schema: Option<&'a str>,
89    configuration: Option<Arc<Configuration>>,
90    extra_plugins: Vec<(String, Box<dyn DynPlugin>)>,
91    subgraph_network_requests: bool,
92}
93
94// Not using buildstructor because `extra_plugin` has non-trivial signature and behavior
95impl<'a> TestHarness<'a> {
96    /// Creates a new builder.
97    pub fn builder() -> Self {
98        Self {
99            schema: None,
100            configuration: None,
101            extra_plugins: Vec::new(),
102            subgraph_network_requests: false,
103        }
104    }
105
106    /// Specifies the logging level. Note that this function may not be called more than once.
107    /// log_level is in RUST_LOG format.
108    pub fn log_level(self, log_level: &'a str) -> Self {
109        // manually filter salsa logs because some of them run at the INFO level https://github.com/salsa-rs/salsa/issues/425
110        let log_level = format!("{log_level},salsa=error");
111        init_telemetry(&log_level).expect("failed to setup logging");
112        self
113    }
114
115    /// Specifies the logging level. Note that this function will silently fail if called more than once.
116    /// log_level is in RUST_LOG format.
117    pub fn try_log_level(self, log_level: &'a str) -> Self {
118        // manually filter salsa logs because some of them run at the INFO level https://github.com/salsa-rs/salsa/issues/425
119        let log_level = format!("{log_level},salsa=error");
120        let _ = init_telemetry(&log_level);
121        self
122    }
123
124    /// Specifies the (static) supergraph schema definition.
125    ///
126    /// Panics if called more than once.
127    ///
128    /// If this isn’t called, a default “canned” schema is used.
129    /// It can be found in the Router repository at `apollo-router/testing_schema.graphql`.
130    /// In that case, subgraph responses are overridden with some “canned” data.
131    pub fn schema(mut self, schema: &'a str) -> Self {
132        assert!(self.schema.is_none(), "schema was specified twice");
133        self.schema = Some(schema);
134        self
135    }
136
137    /// Specifies the (static) router configuration.
138    pub fn configuration(mut self, configuration: Arc<Configuration>) -> Self {
139        assert!(
140            self.configuration.is_none(),
141            "configuration was specified twice"
142        );
143        self.configuration = Some(configuration);
144        self
145    }
146
147    /// Specifies the (static) router configuration as a JSON value,
148    /// such as from the `serde_json::json!` macro.
149    pub fn configuration_json(
150        self,
151        configuration: serde_json::Value,
152    ) -> Result<Self, serde_json::Error> {
153        let configuration: Configuration = serde_json::from_value(configuration)?;
154        Ok(self.configuration(Arc::new(configuration)))
155    }
156
157    /// Specifies the (static) router configuration as a YAML string,
158    /// such as from the `serde_json::json!` macro.
159    pub fn configuration_yaml(self, configuration: &'a str) -> Result<Self, ConfigurationError> {
160        let configuration: Configuration = Configuration::from_str(configuration)?;
161        Ok(self.configuration(Arc::new(configuration)))
162    }
163
164    /// Adds an extra, already instanciated plugin.
165    ///
166    /// May be called multiple times.
167    /// These extra plugins are added after plugins specified in configuration.
168    pub fn extra_plugin<P: Plugin>(mut self, plugin: P) -> Self {
169        let type_id = std::any::TypeId::of::<P>();
170        let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
171            Some(factory) => factory.name.clone(),
172            None => format!(
173                "extra_plugins.{}.{}",
174                self.extra_plugins.len(),
175                std::any::type_name::<P>(),
176            ),
177        };
178
179        self.extra_plugins.push((name, plugin.into()));
180        self
181    }
182
183    /// Adds an extra, already instantiated unstable plugin.
184    ///
185    /// May be called multiple times.
186    /// These extra plugins are added after plugins specified in configuration.
187    pub fn extra_unstable_plugin<P: PluginUnstable>(mut self, plugin: P) -> Self {
188        let type_id = std::any::TypeId::of::<P>();
189        let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
190            Some(factory) => factory.name.clone(),
191            None => format!(
192                "extra_plugins.{}.{}",
193                self.extra_plugins.len(),
194                std::any::type_name::<P>(),
195            ),
196        };
197
198        self.extra_plugins.push((name, Box::new(plugin)));
199        self
200    }
201
202    /// Adds an extra, already instantiated private plugin.
203    ///
204    /// May be called multiple times.
205    /// These extra plugins are added after plugins specified in configuration.
206    #[allow(dead_code)]
207    pub(crate) fn extra_private_plugin<P: PluginPrivate>(mut self, plugin: P) -> Self {
208        let type_id = std::any::TypeId::of::<P>();
209        let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
210            Some(factory) => factory.name.clone(),
211            None => format!(
212                "extra_plugins.{}.{}",
213                self.extra_plugins.len(),
214                std::any::type_name::<P>(),
215            ),
216        };
217
218        self.extra_plugins.push((name, Box::new(plugin)));
219        self
220    }
221
222    /// Adds a callback-based hook similar to [`Plugin::router_service`]
223    pub fn router_hook(
224        self,
225        callback: impl Fn(router::BoxService) -> router::BoxService + Send + Sync + 'static,
226    ) -> Self {
227        self.extra_plugin(RouterServicePlugin(callback))
228    }
229
230    /// Adds a callback-based hook similar to [`Plugin::supergraph_service`]
231    pub fn supergraph_hook(
232        self,
233        callback: impl Fn(supergraph::BoxService) -> supergraph::BoxService + Send + Sync + 'static,
234    ) -> Self {
235        self.extra_plugin(SupergraphServicePlugin(callback))
236    }
237
238    /// Adds a callback-based hook similar to [`Plugin::execution_service`]
239    pub fn execution_hook(
240        self,
241        callback: impl Fn(execution::BoxService) -> execution::BoxService + Send + Sync + 'static,
242    ) -> Self {
243        self.extra_plugin(ExecutionServicePlugin(callback))
244    }
245
246    /// Adds a callback-based hook similar to [`Plugin::subgraph_service`]
247    pub fn subgraph_hook(
248        self,
249        callback: impl Fn(&str, subgraph::BoxService) -> subgraph::BoxService + Send + Sync + 'static,
250    ) -> Self {
251        self.extra_plugin(SubgraphServicePlugin(callback))
252    }
253
254    /// Enables this test harness to make network requests to subgraphs.
255    ///
256    /// If this is not called, all subgraph requests get an empty response by default
257    /// (unless [`schema`][Self::schema] is also not called).
258    /// This behavior can be changed by implementing [`Plugin::subgraph_service`]
259    /// on a plugin given to [`extra_plugin`][Self::extra_plugin].
260    pub fn with_subgraph_network_requests(mut self) -> Self {
261        self.subgraph_network_requests = true;
262        self
263    }
264
265    pub(crate) async fn build_common(
266        self,
267    ) -> Result<(Arc<Configuration>, SupergraphCreator), BoxError> {
268        let builder = if self.schema.is_none() {
269            self.subgraph_hook(|subgraph_name, default| match subgraph_name {
270                "products" => canned::products_subgraph().boxed(),
271                "accounts" => canned::accounts_subgraph().boxed(),
272                "reviews" => canned::reviews_subgraph().boxed(),
273                _ => default,
274            })
275        } else {
276            self
277        };
278        let mut config = builder.configuration.unwrap_or_default();
279        if !builder.subgraph_network_requests {
280            Arc::make_mut(&mut config)
281                .apollo_plugins
282                .plugins
283                .entry("experimental_mock_subgraphs")
284                .or_insert(serde_json::json!({}));
285        }
286        let canned_schema = include_str!("../testing_schema.graphql");
287        let schema = builder.schema.unwrap_or(canned_schema);
288        let schema = Arc::new(Schema::parse(schema, &config)?);
289        let supergraph_creator = YamlRouterFactory
290            .inner_create_supergraph(config.clone(), schema, None, Some(builder.extra_plugins))
291            .await?;
292
293        Ok((config, supergraph_creator))
294    }
295
296    /// Builds the supergraph service
297    #[deprecated = "use build_supergraph instead"]
298    pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
299        self.build_supergraph().await
300    }
301
302    /// Builds the supergraph service
303    pub async fn build_supergraph(self) -> Result<supergraph::BoxCloneService, BoxError> {
304        let (_config, supergraph_creator) = self.build_common().await?;
305
306        Ok(tower::service_fn(move |request| {
307            let router = supergraph_creator.make();
308
309            async move { router.oneshot(request).await }
310        })
311        .boxed_clone())
312    }
313
314    /// Builds the router service
315    pub async fn build_router(self) -> Result<router::BoxCloneService, BoxError> {
316        let (config, supergraph_creator) = self.build_common().await?;
317        let router_creator = RouterCreator::new(
318            QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&config)).await,
319            Arc::new(PersistedQueryLayer::new(&config).await.unwrap()),
320            Arc::new(supergraph_creator),
321            config.clone(),
322        )
323        .await
324        .unwrap();
325
326        Ok(tower::service_fn(move |request: router::Request| {
327            let router = ServiceBuilder::new().service(router_creator.make()).boxed();
328            let span = PropagatingMakeSpan {
329                license: LicenseState::default(),
330                span_mode: span_mode(&config),
331            }
332            .make_span(&request.router_request);
333            async move { router.oneshot(request).await }.instrument(span)
334        })
335        .boxed_clone())
336    }
337
338    #[cfg(test)]
339    pub(crate) async fn build_http_service(self) -> Result<HttpService, BoxError> {
340        use crate::axum_factory::ListenAddrAndRouter;
341        use crate::axum_factory::tests::make_axum_router;
342        use crate::router_factory::RouterFactory;
343
344        let (config, supergraph_creator) = self.build_common().await?;
345        let router_creator = RouterCreator::new(
346            QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&config)).await,
347            Arc::new(PersistedQueryLayer::new(&config).await.unwrap()),
348            Arc::new(supergraph_creator),
349            config.clone(),
350        )
351        .await?;
352
353        let web_endpoints = router_creator.web_endpoints();
354
355        let live = Arc::new(std::sync::atomic::AtomicBool::new(false));
356        let ready = Arc::new(std::sync::atomic::AtomicBool::new(false));
357        let routers = make_axum_router(
358            live,
359            ready,
360            router_creator,
361            &config,
362            web_endpoints,
363            LicenseState::Unlicensed,
364        )?;
365        let ListenAddrAndRouter(_listener, router) = routers.main;
366        Ok(router.boxed())
367    }
368}
369
370/// An HTTP-level service, as would be given to Hyper’s server
371#[cfg(test)]
372pub(crate) type HttpService = tower::util::BoxService<
373    http::Request<crate::services::router::Body>,
374    http::Response<axum::body::BoxBody>,
375    std::convert::Infallible,
376>;
377
378struct RouterServicePlugin<F>(F);
379struct SupergraphServicePlugin<F>(F);
380struct ExecutionServicePlugin<F>(F);
381struct SubgraphServicePlugin<F>(F);
382
383#[async_trait::async_trait]
384impl<F> Plugin for RouterServicePlugin<F>
385where
386    F: 'static + Send + Sync + Fn(router::BoxService) -> router::BoxService,
387{
388    type Config = ();
389
390    async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
391        unreachable!()
392    }
393
394    fn router_service(&self, service: router::BoxService) -> router::BoxService {
395        (self.0)(service)
396    }
397}
398
399#[async_trait::async_trait]
400impl<F> Plugin for SupergraphServicePlugin<F>
401where
402    F: 'static + Send + Sync + Fn(supergraph::BoxService) -> supergraph::BoxService,
403{
404    type Config = ();
405
406    async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
407        unreachable!()
408    }
409
410    fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
411        (self.0)(service)
412    }
413}
414
415#[async_trait::async_trait]
416impl<F> Plugin for ExecutionServicePlugin<F>
417where
418    F: 'static + Send + Sync + Fn(execution::BoxService) -> execution::BoxService,
419{
420    type Config = ();
421
422    async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
423        unreachable!()
424    }
425
426    fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
427        (self.0)(service)
428    }
429}
430
431#[async_trait::async_trait]
432impl<F> Plugin for SubgraphServicePlugin<F>
433where
434    F: 'static + Send + Sync + Fn(&str, subgraph::BoxService) -> subgraph::BoxService,
435{
436    type Config = ();
437
438    async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
439        unreachable!()
440    }
441
442    fn subgraph_service(
443        &self,
444        subgraph_name: &str,
445        service: subgraph::BoxService,
446    ) -> subgraph::BoxService {
447        (self.0)(subgraph_name, service)
448    }
449}
450
451/// a list of subgraphs with pregenerated responses
452#[derive(Default)]
453pub struct MockedSubgraphs(pub(crate) HashMap<&'static str, MockSubgraph>);
454
455impl MockedSubgraphs {
456    /// adds a mocked subgraph to the list
457    pub fn insert(&mut self, name: &'static str, subgraph: MockSubgraph) {
458        self.0.insert(name, subgraph);
459    }
460}
461
462#[async_trait::async_trait]
463impl Plugin for MockedSubgraphs {
464    type Config = ();
465
466    async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
467        unreachable!()
468    }
469
470    fn subgraph_service(
471        &self,
472        subgraph_name: &str,
473        default: subgraph::BoxService,
474    ) -> subgraph::BoxService {
475        self.0
476            .get(subgraph_name)
477            .map(|service| service.clone().boxed())
478            .unwrap_or(default)
479    }
480}
481
482// This function takes a valid request and duplicates it (optionally, with a new operation
483// name) to create an array (batch) request.
484//
485// Note: It's important to make the operation name different to prevent race conditions in testing
486// where various tests assume the presence (or absence) of a test span.
487//
488// Detailed Explanation
489//
490// A batch sends a series of requests concurrently through a router. If we
491// simply duplicate the request, then there is significant chance that spans such as
492// "parse_query" won't appear because the document has already been parsed and is now in a cache.
493//
494// To explicitly avoid this, we add an operation name which will force the router to re-parse the
495// document since operation name is part of the parsed document cache key.
496//
497// This has been a significant cause of racy/flaky tests in the past.
498
499///
500/// Convert a graphql request into a batch of requests
501///
502/// This is helpful for testing batching functionality.
503/// Given a GraphQL request, generate an array containing the request and it's duplicate.
504///
505/// If an op_from_to is supplied, this will modify the duplicated request so that it uses the new
506/// operation name.
507///
508pub fn make_fake_batch(
509    input: http::Request<graphql::Request>,
510    op_from_to: Option<(&str, &str)>,
511) -> http::Request<crate::services::router::Body> {
512    input.map(|req| {
513        // Modify the request so that it is a valid array of requests.
514        let mut new_req = req.clone();
515
516        // If we were given an op_from_to, then try to modify the query to update the operation
517        // name from -> to.
518        // If our request doesn't have an operation name or we weren't given an op_from_to,
519        // just duplicate the request as is.
520        if let Some((from, to)) = op_from_to {
521            if let Some(operation_name) = &req.operation_name {
522                if operation_name == from {
523                    new_req.query = req.query.clone().map(|q| q.replace(from, to));
524                    new_req.operation_name = Some(to.to_string());
525                }
526            }
527        }
528
529        let mut json_bytes_req = serde_json::to_vec(&req).unwrap();
530        let mut json_bytes_new_req = serde_json::to_vec(&new_req).unwrap();
531
532        let mut result = vec![b'['];
533        result.append(&mut json_bytes_req);
534        result.push(b',');
535        result.append(&mut json_bytes_new_req);
536        result.push(b']');
537        crate::services::router::Body::from(result)
538    })
539}
540
541#[tokio::test]
542async fn test_intercept_subgraph_network_requests() {
543    use futures::StreamExt;
544    let request = crate::services::supergraph::Request::canned_builder()
545        .build()
546        .unwrap();
547    let response = TestHarness::builder()
548        .schema(include_str!("../testing_schema.graphql"))
549        .configuration_json(serde_json::json!({
550            "include_subgraph_errors": {
551                "all": true
552            }
553        }))
554        .unwrap()
555        .build_router()
556        .await
557        .unwrap()
558        .oneshot(request.try_into().unwrap())
559        .await
560        .unwrap()
561        .into_graphql_response_stream()
562        .await
563        .next()
564        .await
565        .unwrap()
566        .unwrap();
567    insta::assert_json_snapshot!(response, @r###"
568    {
569      "data": {
570        "topProducts": null
571      },
572      "errors": [
573        {
574          "message": "subgraph mock not configured",
575          "path": [],
576          "extensions": {
577            "code": "SUBGRAPH_MOCK_NOT_CONFIGURED",
578            "service": "products"
579          }
580        }
581      ]
582    }
583    "###);
584}