1use std::collections::HashMap;
4use std::default::Default;
5use std::str::FromStr;
6use std::sync::Arc;
7
8use tower::BoxError;
9use tower::ServiceBuilder;
10use tower::ServiceExt;
11use tower_http::trace::MakeSpan;
12use tracing_futures::Instrument;
13
14use crate::axum_factory::span_mode;
15use crate::axum_factory::utils::PropagatingMakeSpan;
16use crate::configuration::Configuration;
17use crate::configuration::ConfigurationError;
18use crate::graphql;
19use crate::plugin::DynPlugin;
20use crate::plugin::Plugin;
21use crate::plugin::PluginInit;
22use crate::plugin::PluginPrivate;
23use crate::plugin::PluginUnstable;
24use crate::plugin::test::MockSubgraph;
25use crate::plugin::test::canned;
26use crate::plugins::telemetry::reload::init_telemetry;
27use crate::router_factory::YamlRouterFactory;
28use crate::services::HasSchema;
29use crate::services::SupergraphCreator;
30use crate::services::execution;
31use crate::services::layers::persisted_queries::PersistedQueryLayer;
32use crate::services::layers::query_analysis::QueryAnalysisLayer;
33use crate::services::router;
34use crate::services::router::service::RouterCreator;
35use crate::services::subgraph;
36use crate::services::supergraph;
37use crate::spec::Schema;
38use crate::uplink::license_enforcement::LicenseState;
39
40pub mod mocks;
42
43#[cfg(test)]
44pub(crate) mod http_client;
45
46pub struct TestHarness<'a> {
88 schema: Option<&'a str>,
89 configuration: Option<Arc<Configuration>>,
90 extra_plugins: Vec<(String, Box<dyn DynPlugin>)>,
91 subgraph_network_requests: bool,
92}
93
94impl<'a> TestHarness<'a> {
96 pub fn builder() -> Self {
98 Self {
99 schema: None,
100 configuration: None,
101 extra_plugins: Vec::new(),
102 subgraph_network_requests: false,
103 }
104 }
105
106 pub fn log_level(self, log_level: &'a str) -> Self {
109 let log_level = format!("{log_level},salsa=error");
111 init_telemetry(&log_level).expect("failed to setup logging");
112 self
113 }
114
115 pub fn try_log_level(self, log_level: &'a str) -> Self {
118 let log_level = format!("{log_level},salsa=error");
120 let _ = init_telemetry(&log_level);
121 self
122 }
123
124 pub fn schema(mut self, schema: &'a str) -> Self {
132 assert!(self.schema.is_none(), "schema was specified twice");
133 self.schema = Some(schema);
134 self
135 }
136
137 pub fn configuration(mut self, configuration: Arc<Configuration>) -> Self {
139 assert!(
140 self.configuration.is_none(),
141 "configuration was specified twice"
142 );
143 self.configuration = Some(configuration);
144 self
145 }
146
147 pub fn configuration_json(
150 self,
151 configuration: serde_json::Value,
152 ) -> Result<Self, serde_json::Error> {
153 let configuration: Configuration = serde_json::from_value(configuration)?;
154 Ok(self.configuration(Arc::new(configuration)))
155 }
156
157 pub fn configuration_yaml(self, configuration: &'a str) -> Result<Self, ConfigurationError> {
160 let configuration: Configuration = Configuration::from_str(configuration)?;
161 Ok(self.configuration(Arc::new(configuration)))
162 }
163
164 pub fn extra_plugin<P: Plugin>(mut self, plugin: P) -> Self {
169 let type_id = std::any::TypeId::of::<P>();
170 let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
171 Some(factory) => factory.name.clone(),
172 None => format!(
173 "extra_plugins.{}.{}",
174 self.extra_plugins.len(),
175 std::any::type_name::<P>(),
176 ),
177 };
178
179 self.extra_plugins.push((name, plugin.into()));
180 self
181 }
182
183 pub fn extra_unstable_plugin<P: PluginUnstable>(mut self, plugin: P) -> Self {
188 let type_id = std::any::TypeId::of::<P>();
189 let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
190 Some(factory) => factory.name.clone(),
191 None => format!(
192 "extra_plugins.{}.{}",
193 self.extra_plugins.len(),
194 std::any::type_name::<P>(),
195 ),
196 };
197
198 self.extra_plugins.push((name, Box::new(plugin)));
199 self
200 }
201
202 #[allow(dead_code)]
207 pub(crate) fn extra_private_plugin<P: PluginPrivate>(mut self, plugin: P) -> Self {
208 let type_id = std::any::TypeId::of::<P>();
209 let name = match crate::plugin::plugins().find(|factory| factory.type_id == type_id) {
210 Some(factory) => factory.name.clone(),
211 None => format!(
212 "extra_plugins.{}.{}",
213 self.extra_plugins.len(),
214 std::any::type_name::<P>(),
215 ),
216 };
217
218 self.extra_plugins.push((name, Box::new(plugin)));
219 self
220 }
221
222 pub fn router_hook(
224 self,
225 callback: impl Fn(router::BoxService) -> router::BoxService + Send + Sync + 'static,
226 ) -> Self {
227 self.extra_plugin(RouterServicePlugin(callback))
228 }
229
230 pub fn supergraph_hook(
232 self,
233 callback: impl Fn(supergraph::BoxService) -> supergraph::BoxService + Send + Sync + 'static,
234 ) -> Self {
235 self.extra_plugin(SupergraphServicePlugin(callback))
236 }
237
238 pub fn execution_hook(
240 self,
241 callback: impl Fn(execution::BoxService) -> execution::BoxService + Send + Sync + 'static,
242 ) -> Self {
243 self.extra_plugin(ExecutionServicePlugin(callback))
244 }
245
246 pub fn subgraph_hook(
248 self,
249 callback: impl Fn(&str, subgraph::BoxService) -> subgraph::BoxService + Send + Sync + 'static,
250 ) -> Self {
251 self.extra_plugin(SubgraphServicePlugin(callback))
252 }
253
254 pub fn with_subgraph_network_requests(mut self) -> Self {
261 self.subgraph_network_requests = true;
262 self
263 }
264
265 pub(crate) async fn build_common(
266 self,
267 ) -> Result<(Arc<Configuration>, SupergraphCreator), BoxError> {
268 let builder = if self.schema.is_none() {
269 self.subgraph_hook(|subgraph_name, default| match subgraph_name {
270 "products" => canned::products_subgraph().boxed(),
271 "accounts" => canned::accounts_subgraph().boxed(),
272 "reviews" => canned::reviews_subgraph().boxed(),
273 _ => default,
274 })
275 } else {
276 self
277 };
278 let mut config = builder.configuration.unwrap_or_default();
279 if !builder.subgraph_network_requests {
280 Arc::make_mut(&mut config)
281 .apollo_plugins
282 .plugins
283 .entry("experimental_mock_subgraphs")
284 .or_insert(serde_json::json!({}));
285 }
286 let canned_schema = include_str!("../testing_schema.graphql");
287 let schema = builder.schema.unwrap_or(canned_schema);
288 let schema = Arc::new(Schema::parse(schema, &config)?);
289 let supergraph_creator = YamlRouterFactory
290 .inner_create_supergraph(config.clone(), schema, None, Some(builder.extra_plugins))
291 .await?;
292
293 Ok((config, supergraph_creator))
294 }
295
296 #[deprecated = "use build_supergraph instead"]
298 pub async fn build(self) -> Result<supergraph::BoxCloneService, BoxError> {
299 self.build_supergraph().await
300 }
301
302 pub async fn build_supergraph(self) -> Result<supergraph::BoxCloneService, BoxError> {
304 let (_config, supergraph_creator) = self.build_common().await?;
305
306 Ok(tower::service_fn(move |request| {
307 let router = supergraph_creator.make();
308
309 async move { router.oneshot(request).await }
310 })
311 .boxed_clone())
312 }
313
314 pub async fn build_router(self) -> Result<router::BoxCloneService, BoxError> {
316 let (config, supergraph_creator) = self.build_common().await?;
317 let router_creator = RouterCreator::new(
318 QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&config)).await,
319 Arc::new(PersistedQueryLayer::new(&config).await.unwrap()),
320 Arc::new(supergraph_creator),
321 config.clone(),
322 )
323 .await
324 .unwrap();
325
326 Ok(tower::service_fn(move |request: router::Request| {
327 let router = ServiceBuilder::new().service(router_creator.make()).boxed();
328 let span = PropagatingMakeSpan {
329 license: LicenseState::default(),
330 span_mode: span_mode(&config),
331 }
332 .make_span(&request.router_request);
333 async move { router.oneshot(request).await }.instrument(span)
334 })
335 .boxed_clone())
336 }
337
338 #[cfg(test)]
339 pub(crate) async fn build_http_service(self) -> Result<HttpService, BoxError> {
340 use crate::axum_factory::ListenAddrAndRouter;
341 use crate::axum_factory::tests::make_axum_router;
342 use crate::router_factory::RouterFactory;
343
344 let (config, supergraph_creator) = self.build_common().await?;
345 let router_creator = RouterCreator::new(
346 QueryAnalysisLayer::new(supergraph_creator.schema(), Arc::clone(&config)).await,
347 Arc::new(PersistedQueryLayer::new(&config).await.unwrap()),
348 Arc::new(supergraph_creator),
349 config.clone(),
350 )
351 .await?;
352
353 let web_endpoints = router_creator.web_endpoints();
354
355 let live = Arc::new(std::sync::atomic::AtomicBool::new(false));
356 let ready = Arc::new(std::sync::atomic::AtomicBool::new(false));
357 let routers = make_axum_router(
358 live,
359 ready,
360 router_creator,
361 &config,
362 web_endpoints,
363 LicenseState::Unlicensed,
364 )?;
365 let ListenAddrAndRouter(_listener, router) = routers.main;
366 Ok(router.boxed())
367 }
368}
369
370#[cfg(test)]
372pub(crate) type HttpService = tower::util::BoxService<
373 http::Request<crate::services::router::Body>,
374 http::Response<axum::body::BoxBody>,
375 std::convert::Infallible,
376>;
377
378struct RouterServicePlugin<F>(F);
379struct SupergraphServicePlugin<F>(F);
380struct ExecutionServicePlugin<F>(F);
381struct SubgraphServicePlugin<F>(F);
382
383#[async_trait::async_trait]
384impl<F> Plugin for RouterServicePlugin<F>
385where
386 F: 'static + Send + Sync + Fn(router::BoxService) -> router::BoxService,
387{
388 type Config = ();
389
390 async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
391 unreachable!()
392 }
393
394 fn router_service(&self, service: router::BoxService) -> router::BoxService {
395 (self.0)(service)
396 }
397}
398
399#[async_trait::async_trait]
400impl<F> Plugin for SupergraphServicePlugin<F>
401where
402 F: 'static + Send + Sync + Fn(supergraph::BoxService) -> supergraph::BoxService,
403{
404 type Config = ();
405
406 async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
407 unreachable!()
408 }
409
410 fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
411 (self.0)(service)
412 }
413}
414
415#[async_trait::async_trait]
416impl<F> Plugin for ExecutionServicePlugin<F>
417where
418 F: 'static + Send + Sync + Fn(execution::BoxService) -> execution::BoxService,
419{
420 type Config = ();
421
422 async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
423 unreachable!()
424 }
425
426 fn execution_service(&self, service: execution::BoxService) -> execution::BoxService {
427 (self.0)(service)
428 }
429}
430
431#[async_trait::async_trait]
432impl<F> Plugin for SubgraphServicePlugin<F>
433where
434 F: 'static + Send + Sync + Fn(&str, subgraph::BoxService) -> subgraph::BoxService,
435{
436 type Config = ();
437
438 async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
439 unreachable!()
440 }
441
442 fn subgraph_service(
443 &self,
444 subgraph_name: &str,
445 service: subgraph::BoxService,
446 ) -> subgraph::BoxService {
447 (self.0)(subgraph_name, service)
448 }
449}
450
451#[derive(Default)]
453pub struct MockedSubgraphs(pub(crate) HashMap<&'static str, MockSubgraph>);
454
455impl MockedSubgraphs {
456 pub fn insert(&mut self, name: &'static str, subgraph: MockSubgraph) {
458 self.0.insert(name, subgraph);
459 }
460}
461
462#[async_trait::async_trait]
463impl Plugin for MockedSubgraphs {
464 type Config = ();
465
466 async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
467 unreachable!()
468 }
469
470 fn subgraph_service(
471 &self,
472 subgraph_name: &str,
473 default: subgraph::BoxService,
474 ) -> subgraph::BoxService {
475 self.0
476 .get(subgraph_name)
477 .map(|service| service.clone().boxed())
478 .unwrap_or(default)
479 }
480}
481
482pub fn make_fake_batch(
509 input: http::Request<graphql::Request>,
510 op_from_to: Option<(&str, &str)>,
511) -> http::Request<crate::services::router::Body> {
512 input.map(|req| {
513 let mut new_req = req.clone();
515
516 if let Some((from, to)) = op_from_to {
521 if let Some(operation_name) = &req.operation_name {
522 if operation_name == from {
523 new_req.query = req.query.clone().map(|q| q.replace(from, to));
524 new_req.operation_name = Some(to.to_string());
525 }
526 }
527 }
528
529 let mut json_bytes_req = serde_json::to_vec(&req).unwrap();
530 let mut json_bytes_new_req = serde_json::to_vec(&new_req).unwrap();
531
532 let mut result = vec![b'['];
533 result.append(&mut json_bytes_req);
534 result.push(b',');
535 result.append(&mut json_bytes_new_req);
536 result.push(b']');
537 crate::services::router::Body::from(result)
538 })
539}
540
541#[tokio::test]
542async fn test_intercept_subgraph_network_requests() {
543 use futures::StreamExt;
544 let request = crate::services::supergraph::Request::canned_builder()
545 .build()
546 .unwrap();
547 let response = TestHarness::builder()
548 .schema(include_str!("../testing_schema.graphql"))
549 .configuration_json(serde_json::json!({
550 "include_subgraph_errors": {
551 "all": true
552 }
553 }))
554 .unwrap()
555 .build_router()
556 .await
557 .unwrap()
558 .oneshot(request.try_into().unwrap())
559 .await
560 .unwrap()
561 .into_graphql_response_stream()
562 .await
563 .next()
564 .await
565 .unwrap()
566 .unwrap();
567 insta::assert_json_snapshot!(response, @r###"
568 {
569 "data": {
570 "topProducts": null
571 },
572 "errors": [
573 {
574 "message": "subgraph mock not configured",
575 "path": [],
576 "extensions": {
577 "code": "SUBGRAPH_MOCK_NOT_CONFIGURED",
578 "service": "products"
579 }
580 }
581 ]
582 }
583 "###);
584}