1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use http_body_util::LengthLimitError;
23use http_body_util::Limited;
24use opentelemetry::KeyValue;
25use parking_lot::Mutex;
26use serde_json_bytes::Map;
27use serde_json_bytes::Value;
28use tracing::Span;
29
30use crate::Context;
31use crate::graphql;
32use crate::json_ext::Path;
33use crate::plugins::limits::ConnectorResponseSizeLimit;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
35use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
36use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
37use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
38use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
39use crate::plugins::telemetry::config_new::events::log_event;
40use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
41use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
42use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
43use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
44use crate::services::connect::Response;
45use crate::services::connector;
46use crate::services::fetch::AddSubgraphNameExt;
47
48impl From<RuntimeError> for graphql::Error {
51 fn from(error: RuntimeError) -> Self {
52 let path: Path = (&error.path).into();
53
54 let err = graphql::Error::builder()
55 .message(&error.message)
56 .extensions(error.extensions())
57 .extension_code(error.code())
58 .path(path)
59 .build();
60
61 if let Some(subgraph_name) = &error.subgraph_name {
62 err.with_subgraph_name(subgraph_name)
63 } else {
64 err
65 }
66 }
67}
68
69#[allow(clippy::too_many_arguments)]
72pub(crate) async fn process_response<T>(
73 result: Result<http::Response<T>, Error>,
74 response_key: ResponseKey,
75 connector: Arc<Connector>,
76 context: &Context,
77 debug_request: DebugRequest,
78 debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
79 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
80 operation: Option<Arc<Valid<ExecutableDocument>>>,
81) -> connector::request_service::Response
82where
83 T: HttpBody,
84 T::Error: Into<tower::BoxError>,
85{
86 let (mapped_response, result) = match result {
87 Err(error) => {
89 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
90 (
91 MappedResponse::Error {
92 error: error.to_runtime_error(&connector, &response_key),
93 key: response_key,
94 problems: Vec::new(),
95 },
96 Err(error),
97 )
98 }
99 Ok(response) => {
100 let (parts, body) = response.into_parts();
101
102 let result = Ok(TransportResponse::Http(HttpResponse {
103 inner: parts.clone(),
104 }));
105
106 let make_err = |message: String, code: &str| -> Box<RuntimeError> {
107 let mut err = RuntimeError::new(message, &response_key);
108 err.subgraph_name = Some(connector.id.subgraph_name.clone());
109 err = err.with_code(code);
110 err.coordinate = Some(connector.id.coordinate());
111 err = err.extension(
112 "http",
113 Value::Object(Map::from_iter([(
114 "status".into(),
115 Value::Number(parts.status.as_u16().into()),
116 )])),
117 );
118 Box::new(err)
119 };
120
121 let make_invalid_response_err = || {
122 make_err(
123 "The server returned data in an unexpected format.".to_string(),
124 "CONNECTOR_RESPONSE_INVALID",
125 )
126 };
127
128 let make_limit_err = |limit: usize| {
129 make_err(
130 format!("connector response body exceeded limit of {limit} bytes"),
131 "CONNECTOR_RESPONSE_SIZE_LIMIT_EXCEEDED",
132 )
133 };
134
135 let response_size_limit = context
136 .extensions()
137 .with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
138
139 let body_result: Result<_, Box<RuntimeError>> = match response_size_limit {
140 Some(ConnectorResponseSizeLimit(limit)) => {
141 Limited::new(body, limit)
142 .collect()
143 .await
144 .map_err(|e| {
145 if e.downcast_ref::<LengthLimitError>().is_some() {
146 u64_counter!(
147 "apollo.router.limits.connector_response_size.exceeded",
148 "Number of connector responses aborted because they exceeded the configured response size limit",
149 1,
150 "connector.source" = connector.source_config_key()
151 );
152 tracing::Span::current()
153 .record("apollo.connector.response.aborted", "response_size_limit");
154 make_limit_err(limit)
155 } else {
156 make_invalid_response_err()
157 }
158 })
159 }
160 None => body
161 .collect()
162 .await
163 .map_err(|_| make_invalid_response_err()),
164 };
165
166 let deserialized_body = body_result.and_then(|body| {
167 let body = body.to_bytes();
168 let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
169 if let Some(debug_context) = debug_context {
170 debug_context.lock().push_invalid_response(
171 debug_request.0.clone(),
172 &parts,
173 &body,
174 &connector.error_settings,
175 debug_request.1.clone(),
176 );
177 }
178 make_invalid_response_err()
179 });
180 log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
181 raw
182 });
183
184 let mapped = match &deserialized_body {
188 Err(error) => MappedResponse::Error {
189 error: error.as_ref().clone(),
190 key: response_key,
191 problems: Vec::new(),
192 },
193 Ok(data) => handle_raw_response(
194 data,
195 &parts,
196 response_key,
197 &connector,
198 context,
199 supergraph_request.headers(),
200 )
201 .apply_operation(
202 operation
203 .as_ref()
204 .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
205 &connector.schema_subtypes_map,
206 ),
207 };
208
209 if let Some(debug) = debug_context {
210 let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
211 debug_problems.extend(debug_request.1);
212
213 let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
214 Some(SelectionData {
215 source: connector.selection.to_string(),
216 transformed: key.selection().to_string(),
217 result: Some(data.clone()),
218 })
219 } else {
220 None
221 };
222
223 debug.lock().push_response(
224 debug_request.0,
225 &parts,
226 deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
227 selection_data,
228 &connector.error_settings,
229 debug_problems,
230 );
231 }
232 if matches!(mapped, MappedResponse::Data { .. }) {
233 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
234 } else {
235 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
236 }
237
238 (mapped, result)
239 }
240 };
241
242 if let MappedResponse::Error { ref error, .. } = mapped_response {
243 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
244 }
245
246 connector::request_service::Response {
247 context: context.clone(),
248 transport_result: result,
249 mapped_response,
250 }
251}
252
253pub(crate) fn aggregate_responses(
254 responses: Vec<MappedResponse>,
255 _context: Context,
256) -> Result<Response, HandleResponseError> {
257 let mut data = serde_json_bytes::Map::new();
258 let mut errors = Vec::new();
259 let count = responses.len();
260
261 for mapped in responses {
262 mapped.add_to_data(&mut data, &mut errors, count)?;
263 }
264
265 let data = if data.is_empty() {
266 Value::Null
267 } else {
268 Value::Object(data)
269 };
270
271 Span::current().record(
272 OTEL_STATUS_CODE,
273 if errors.is_empty() {
274 OTEL_STATUS_CODE_OK
275 } else {
276 OTEL_STATUS_CODE_ERROR
277 },
278 );
279
280 Ok(Response {
281 response: http::Response::builder()
282 .body(
283 graphql::Response::builder()
284 .data(data)
285 .errors(errors.into_iter().map(|e| e.into()).collect())
286 .build(),
287 )
288 .unwrap(),
289 })
290}
291
292fn log_connectors_event(
293 context: &Context,
294 body: &[u8],
295 parts: &Parts,
296 response_key: ResponseKey,
297 connector: &Connector,
298) {
299 let log_response_level = context
300 .extensions()
301 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
302 .and_then(|event| {
303 let response = connector::request_service::Response {
315 context: context.clone(),
316 transport_result: Ok(TransportResponse::Http(HttpResponse {
317 inner: parts.clone(),
318 })),
319 mapped_response: MappedResponse::Data {
320 data: Value::Null,
321 key: response_key,
322 problems: vec![],
323 },
324 };
325 if event.condition.evaluate_response(&response) {
326 Some(event.level)
327 } else {
328 None
329 }
330 });
331
332 if let Some(level) = log_response_level {
333 let mut attrs = Vec::with_capacity(4);
334 #[cfg(test)]
335 let headers = {
336 let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
337 .headers
338 .iter()
339 .map(|(name, val)| (name.to_string(), val.clone()))
340 .collect();
341 headers.sort_keys();
342 headers
343 };
344 #[cfg(not(test))]
345 let headers = &parts.headers;
346
347 attrs.push(KeyValue::new(
348 HTTP_RESPONSE_HEADERS,
349 opentelemetry::Value::String(format!("{headers:?}").into()),
350 ));
351 attrs.push(KeyValue::new(
352 HTTP_RESPONSE_STATUS,
353 opentelemetry::Value::String(format!("{}", parts.status).into()),
354 ));
355 attrs.push(KeyValue::new(
356 HTTP_RESPONSE_VERSION,
357 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
358 ));
359 attrs.push(KeyValue::new(
360 HTTP_RESPONSE_BODY,
361 opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
362 ));
363
364 log_event(
365 level,
366 "connector.response",
367 attrs,
368 &format!("Response from connector {label:?}", label = connector.label),
369 );
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::sync::Arc;
376
377 use apollo_compiler::Schema;
378 use apollo_compiler::collections::IndexMap;
379 use apollo_compiler::name;
380 use apollo_compiler::response::JsonValue;
381 use apollo_federation::connectors::ConnectId;
382 use apollo_federation::connectors::ConnectSpec;
383 use apollo_federation::connectors::Connector;
384 use apollo_federation::connectors::ConnectorErrorsSettings;
385 use apollo_federation::connectors::EntityResolver;
386 use apollo_federation::connectors::HTTPMethod;
387 use apollo_federation::connectors::HttpJsonTransport;
388 use apollo_federation::connectors::JSONSelection;
389 use apollo_federation::connectors::Label;
390 use apollo_federation::connectors::Namespace;
391 use apollo_federation::connectors::runtime::inputs::RequestInputs;
392 use apollo_federation::connectors::runtime::key::ResponseKey;
393 use insta::assert_debug_snapshot;
394 use itertools::Itertools;
395 use serde_json_bytes::json;
396
397 use crate::Context;
398 use crate::graphql;
399 use crate::plugins::connectors::handle_responses::process_response;
400 use crate::services::router;
401 use crate::services::router::body::RouterBody;
402
403 #[tokio::test]
404 async fn test_handle_responses_root_fields() {
405 let connector = Arc::new(Connector {
406 spec: ConnectSpec::V0_1,
407 schema_subtypes_map: Default::default(),
408 id: ConnectId::new(
409 "subgraph_name".into(),
410 None,
411 name!(Query),
412 name!(hello),
413 None,
414 0,
415 ),
416 transport: Some(HttpJsonTransport {
417 source_template: "http://localhost/api".parse().ok(),
418 connect_template: "/path".parse().unwrap(),
419 ..Default::default()
420 }),
421 selection: JSONSelection::parse("$.data").unwrap(),
422 entity_resolver: None,
423 config: Default::default(),
424 max_requests: None,
425 batch_settings: None,
426 request_headers: Default::default(),
427 response_headers: Default::default(),
428 request_variable_keys: Default::default(),
429 response_variable_keys: Default::default(),
430 error_settings: Default::default(),
431 label: "test label".into(),
432 });
433
434 let response1: http::Response<RouterBody> = http::Response::builder()
435 .body(router::body::from_bytes(r#"{"data":"world"}"#))
436 .unwrap();
437 let response_key1 = ResponseKey::RootField {
438 name: "hello".to_string(),
439 inputs: Default::default(),
440 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
441 };
442
443 let response2 = http::Response::builder()
444 .body(router::body::from_bytes(r#"{"data":"world"}"#))
445 .unwrap();
446 let response_key2 = ResponseKey::RootField {
447 name: "hello2".to_string(),
448 inputs: Default::default(),
449 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
450 };
451
452 let supergraph_request = Arc::new(
453 http::Request::builder()
454 .body(graphql::Request::builder().build())
455 .unwrap(),
456 );
457
458 let res = super::aggregate_responses(
459 vec![
460 process_response(
461 Ok(response1),
462 response_key1,
463 connector.clone(),
464 &Context::default(),
465 (None, Default::default()),
466 None,
467 supergraph_request.clone(),
468 Default::default(),
469 )
470 .await
471 .mapped_response,
472 process_response(
473 Ok(response2),
474 response_key2,
475 connector,
476 &Context::default(),
477 (None, Default::default()),
478 None,
479 supergraph_request,
480 Default::default(),
481 )
482 .await
483 .mapped_response,
484 ],
485 Context::new(),
486 )
487 .unwrap();
488
489 assert_debug_snapshot!(res.response, @r#"
490 Response {
491 status: 200,
492 version: HTTP/1.1,
493 headers: {},
494 body: Response {
495 label: None,
496 data: Some(
497 Object({
498 "hello": String(
499 "world",
500 ),
501 "hello2": String(
502 "world",
503 ),
504 }),
505 ),
506 path: None,
507 errors: [],
508 extensions: {},
509 has_next: None,
510 subscribed: None,
511 created_at: None,
512 incremental: [],
513 },
514 }
515 "#);
516 }
517
518 #[tokio::test]
519 async fn test_handle_responses_entities() {
520 let connector = Arc::new(Connector {
521 spec: ConnectSpec::V0_1,
522 schema_subtypes_map: Default::default(),
523 id: ConnectId::new(
524 "subgraph_name".into(),
525 None,
526 name!(Query),
527 name!(user),
528 None,
529 0,
530 ),
531 transport: Some(HttpJsonTransport {
532 source_template: "http://localhost/api".parse().ok(),
533 connect_template: "/path".parse().unwrap(),
534 ..Default::default()
535 }),
536 selection: JSONSelection::parse("$.data { id }").unwrap(),
537 entity_resolver: Some(EntityResolver::Explicit),
538 config: Default::default(),
539 max_requests: None,
540 batch_settings: None,
541 request_headers: Default::default(),
542 response_headers: Default::default(),
543 request_variable_keys: Default::default(),
544 response_variable_keys: Default::default(),
545 error_settings: Default::default(),
546 label: "test label".into(),
547 });
548
549 let response1: http::Response<RouterBody> = http::Response::builder()
550 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
551 .unwrap();
552 let response_key1 = ResponseKey::Entity {
553 index: 0,
554 inputs: Default::default(),
555 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
556 };
557
558 let response2 = http::Response::builder()
559 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
560 .unwrap();
561 let response_key2 = ResponseKey::Entity {
562 index: 1,
563 inputs: Default::default(),
564 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
565 };
566
567 let supergraph_request = Arc::new(
568 http::Request::builder()
569 .body(graphql::Request::builder().build())
570 .unwrap(),
571 );
572
573 let res = super::aggregate_responses(
574 vec![
575 process_response(
576 Ok(response1),
577 response_key1,
578 connector.clone(),
579 &Context::default(),
580 (None, Default::default()),
581 None,
582 supergraph_request.clone(),
583 Default::default(),
584 )
585 .await
586 .mapped_response,
587 process_response(
588 Ok(response2),
589 response_key2,
590 connector,
591 &Context::default(),
592 (None, Default::default()),
593 None,
594 supergraph_request,
595 Default::default(),
596 )
597 .await
598 .mapped_response,
599 ],
600 Context::new(),
601 )
602 .unwrap();
603
604 assert_debug_snapshot!(res.response, @r#"
605 Response {
606 status: 200,
607 version: HTTP/1.1,
608 headers: {},
609 body: Response {
610 label: None,
611 data: Some(
612 Object({
613 "_entities": Array([
614 Object({
615 "id": String(
616 "1",
617 ),
618 }),
619 Object({
620 "id": String(
621 "2",
622 ),
623 }),
624 ]),
625 }),
626 ),
627 path: None,
628 errors: [],
629 extensions: {},
630 has_next: None,
631 subscribed: None,
632 created_at: None,
633 incremental: [],
634 },
635 }
636 "#);
637 }
638
639 #[tokio::test]
640 async fn test_handle_responses_batch() {
641 let connector = Arc::new(Connector {
642 spec: ConnectSpec::V0_2,
643 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
644 schema_subtypes_map: Default::default(),
645 transport: Some(HttpJsonTransport {
646 source_template: "http://localhost/api".parse().ok(),
647 connect_template: "/path".parse().unwrap(),
648 method: HTTPMethod::Post,
649 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
650 ..Default::default()
651 }),
652 selection: JSONSelection::parse("$.data { id name }").unwrap(),
653 entity_resolver: Some(EntityResolver::TypeBatch),
654 config: Default::default(),
655 max_requests: None,
656 batch_settings: None,
657 request_headers: Default::default(),
658 response_headers: Default::default(),
659 request_variable_keys: Default::default(),
660 response_variable_keys: Default::default(),
661 error_settings: Default::default(),
662 label: "test label".into(),
663 });
664
665 let keys = connector
666 .resolvable_key(
667 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
668 .unwrap(),
669 )
670 .unwrap()
671 .unwrap();
672
673 let response1: http::Response<RouterBody> = http::Response::builder()
674 .body(router::body::from_bytes(
676 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
677 ))
678 .unwrap();
679
680 let mut inputs: RequestInputs = RequestInputs::default();
681 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
682 inputs.batch = representations
683 .as_array()
684 .unwrap()
685 .iter()
686 .map(|v| v.as_object().unwrap().clone())
687 .collect_vec();
688
689 let response_key1 = ResponseKey::BatchEntity {
690 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
691 keys,
692 inputs,
693 };
694
695 let supergraph_request = Arc::new(
696 http::Request::builder()
697 .body(graphql::Request::builder().build())
698 .unwrap(),
699 );
700
701 let res = super::aggregate_responses(
702 vec![
703 process_response(
704 Ok(response1),
705 response_key1,
706 connector.clone(),
707 &Context::default(),
708 (None, Default::default()),
709 None,
710 supergraph_request,
711 Default::default(),
712 )
713 .await
714 .mapped_response,
715 ],
716 Context::new(),
717 )
718 .unwrap();
719
720 assert_debug_snapshot!(res.response, @r#"
721 Response {
722 status: 200,
723 version: HTTP/1.1,
724 headers: {},
725 body: Response {
726 label: None,
727 data: Some(
728 Object({
729 "_entities": Array([
730 Object({
731 "id": String(
732 "1",
733 ),
734 "name": String(
735 "A",
736 ),
737 }),
738 Object({
739 "id": String(
740 "2",
741 ),
742 "name": String(
743 "B",
744 ),
745 }),
746 ]),
747 }),
748 ),
749 path: None,
750 errors: [],
751 extensions: {},
752 has_next: None,
753 subscribed: None,
754 created_at: None,
755 incremental: [],
756 },
757 }
758 "#);
759 }
760
761 #[tokio::test]
762 async fn test_handle_responses_entity_field() {
763 let connector = Arc::new(Connector {
764 spec: ConnectSpec::V0_1,
765 schema_subtypes_map: Default::default(),
766 id: ConnectId::new(
767 "subgraph_name".into(),
768 None,
769 name!(User),
770 name!(field),
771 None,
772 0,
773 ),
774 transport: Some(HttpJsonTransport {
775 source_template: "http://localhost/api".parse().ok(),
776 connect_template: "/path".parse().unwrap(),
777 ..Default::default()
778 }),
779 selection: JSONSelection::parse("$.data").unwrap(),
780 entity_resolver: Some(EntityResolver::Implicit),
781 config: Default::default(),
782 max_requests: None,
783 batch_settings: None,
784 request_headers: Default::default(),
785 response_headers: Default::default(),
786 request_variable_keys: Default::default(),
787 response_variable_keys: Default::default(),
788 error_settings: Default::default(),
789 label: "test label".into(),
790 });
791
792 let response1: http::Response<RouterBody> = http::Response::builder()
793 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
794 .unwrap();
795 let response_key1 = ResponseKey::EntityField {
796 index: 0,
797 inputs: Default::default(),
798 field_name: "field".to_string(),
799 typename: Some(name!("User")),
800 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
801 };
802
803 let response2 = http::Response::builder()
804 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
805 .unwrap();
806 let response_key2 = ResponseKey::EntityField {
807 index: 1,
808 inputs: Default::default(),
809 field_name: "field".to_string(),
810 typename: Some(name!("User")),
811 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
812 };
813
814 let supergraph_request = Arc::new(
815 http::Request::builder()
816 .body(graphql::Request::builder().build())
817 .unwrap(),
818 );
819
820 let res = super::aggregate_responses(
821 vec![
822 process_response(
823 Ok(response1),
824 response_key1,
825 connector.clone(),
826 &Context::default(),
827 (None, Default::default()),
828 None,
829 supergraph_request.clone(),
830 Default::default(),
831 )
832 .await
833 .mapped_response,
834 process_response(
835 Ok(response2),
836 response_key2,
837 connector,
838 &Context::default(),
839 (None, Default::default()),
840 None,
841 supergraph_request,
842 Default::default(),
843 )
844 .await
845 .mapped_response,
846 ],
847 Context::new(),
848 )
849 .unwrap();
850
851 assert_debug_snapshot!(res.response, @r#"
852 Response {
853 status: 200,
854 version: HTTP/1.1,
855 headers: {},
856 body: Response {
857 label: None,
858 data: Some(
859 Object({
860 "_entities": Array([
861 Object({
862 "__typename": String(
863 "User",
864 ),
865 "field": String(
866 "value1",
867 ),
868 }),
869 Object({
870 "__typename": String(
871 "User",
872 ),
873 "field": String(
874 "value2",
875 ),
876 }),
877 ]),
878 }),
879 ),
880 path: None,
881 errors: [],
882 extensions: {},
883 has_next: None,
884 subscribed: None,
885 created_at: None,
886 incremental: [],
887 },
888 }
889 "#);
890 }
891
892 #[tokio::test]
893 async fn test_handle_responses_errors() {
894 let connector = Arc::new(Connector {
895 spec: ConnectSpec::V0_1,
896 schema_subtypes_map: Default::default(),
897 id: ConnectId::new(
898 "subgraph_name".into(),
899 None,
900 name!(Query),
901 name!(user),
902 None,
903 0,
904 ),
905 transport: Some(HttpJsonTransport {
906 source_template: "http://localhost/api".parse().ok(),
907 connect_template: "/path".parse().unwrap(),
908 ..Default::default()
909 }),
910 selection: JSONSelection::parse("$.data").unwrap(),
911 entity_resolver: Some(EntityResolver::Explicit),
912 config: Default::default(),
913 max_requests: None,
914 batch_settings: None,
915 request_headers: Default::default(),
916 response_headers: Default::default(),
917 request_variable_keys: Default::default(),
918 response_variable_keys: Default::default(),
919 error_settings: Default::default(),
920 label: "test label".into(),
921 });
922
923 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
924 .body(router::body::from_bytes(r#"plain text"#))
925 .unwrap();
926 let response_key_plaintext = ResponseKey::Entity {
927 index: 0,
928 inputs: Default::default(),
929 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
930 };
931
932 let response1: http::Response<RouterBody> = http::Response::builder()
933 .status(404)
934 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
935 .unwrap();
936 let response_key1 = ResponseKey::Entity {
937 index: 1,
938 inputs: Default::default(),
939 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
940 };
941
942 let response2 = http::Response::builder()
943 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
944 .unwrap();
945 let response_key2 = ResponseKey::Entity {
946 index: 2,
947 inputs: Default::default(),
948 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
949 };
950
951 let response3: http::Response<RouterBody> = http::Response::builder()
952 .status(500)
953 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
954 .unwrap();
955 let response_key3 = ResponseKey::Entity {
956 index: 3,
957 inputs: Default::default(),
958 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
959 };
960
961 let supergraph_request = Arc::new(
962 http::Request::builder()
963 .body(graphql::Request::builder().build())
964 .unwrap(),
965 );
966
967 let mut res = super::aggregate_responses(
968 vec![
969 process_response(
970 Ok(response_plaintext),
971 response_key_plaintext,
972 connector.clone(),
973 &Context::default(),
974 (None, Default::default()),
975 None,
976 supergraph_request.clone(),
977 Default::default(),
978 )
979 .await
980 .mapped_response,
981 process_response(
982 Ok(response1),
983 response_key1,
984 connector.clone(),
985 &Context::default(),
986 (None, Default::default()),
987 None,
988 supergraph_request.clone(),
989 Default::default(),
990 )
991 .await
992 .mapped_response,
993 process_response(
994 Ok(response2),
995 response_key2,
996 connector.clone(),
997 &Context::default(),
998 (None, Default::default()),
999 None,
1000 supergraph_request.clone(),
1001 Default::default(),
1002 )
1003 .await
1004 .mapped_response,
1005 process_response(
1006 Ok(response3),
1007 response_key3,
1008 connector,
1009 &Context::default(),
1010 (None, Default::default()),
1011 None,
1012 supergraph_request,
1013 Default::default(),
1014 )
1015 .await
1016 .mapped_response,
1017 ],
1018 Context::new(),
1019 )
1020 .unwrap();
1021
1022 let body = res.response.body_mut();
1026 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1027
1028 assert_debug_snapshot!(res.response, @r#"
1029 Response {
1030 status: 200,
1031 version: HTTP/1.1,
1032 headers: {},
1033 body: Response {
1034 label: None,
1035 data: Some(
1036 Object({
1037 "_entities": Array([
1038 Null,
1039 Null,
1040 Object({
1041 "id": String(
1042 "2",
1043 ),
1044 }),
1045 Null,
1046 ]),
1047 }),
1048 ),
1049 path: None,
1050 errors: [
1051 Error {
1052 message: "The server returned data in an unexpected format.",
1053 locations: [],
1054 path: Some(
1055 Path(
1056 [
1057 Key(
1058 "_entities",
1059 None,
1060 ),
1061 Index(
1062 0,
1063 ),
1064 ],
1065 ),
1066 ),
1067 extensions: {
1068 "code": String(
1069 "CONNECTOR_RESPONSE_INVALID",
1070 ),
1071 "service": String(
1072 "subgraph_name",
1073 ),
1074 "connector": Object({
1075 "coordinate": String(
1076 "subgraph_name:Query.user[0]",
1077 ),
1078 }),
1079 "http": Object({
1080 "status": Number(200),
1081 }),
1082 "apollo.private.subgraph.name": String(
1083 "subgraph_name",
1084 ),
1085 },
1086 apollo_id: 00000000-0000-0000-0000-000000000000,
1087 },
1088 Error {
1089 message: "Request failed",
1090 locations: [],
1091 path: Some(
1092 Path(
1093 [
1094 Key(
1095 "_entities",
1096 None,
1097 ),
1098 Index(
1099 1,
1100 ),
1101 ],
1102 ),
1103 ),
1104 extensions: {
1105 "code": String(
1106 "CONNECTOR_FETCH",
1107 ),
1108 "service": String(
1109 "subgraph_name",
1110 ),
1111 "connector": Object({
1112 "coordinate": String(
1113 "subgraph_name:Query.user[0]",
1114 ),
1115 }),
1116 "http": Object({
1117 "status": Number(404),
1118 }),
1119 "apollo.private.subgraph.name": String(
1120 "subgraph_name",
1121 ),
1122 },
1123 apollo_id: 00000000-0000-0000-0000-000000000000,
1124 },
1125 Error {
1126 message: "Request failed",
1127 locations: [],
1128 path: Some(
1129 Path(
1130 [
1131 Key(
1132 "_entities",
1133 None,
1134 ),
1135 Index(
1136 3,
1137 ),
1138 ],
1139 ),
1140 ),
1141 extensions: {
1142 "code": String(
1143 "CONNECTOR_FETCH",
1144 ),
1145 "service": String(
1146 "subgraph_name",
1147 ),
1148 "connector": Object({
1149 "coordinate": String(
1150 "subgraph_name:Query.user[0]",
1151 ),
1152 }),
1153 "http": Object({
1154 "status": Number(500),
1155 }),
1156 "apollo.private.subgraph.name": String(
1157 "subgraph_name",
1158 ),
1159 },
1160 apollo_id: 00000000-0000-0000-0000-000000000000,
1161 },
1162 ],
1163 extensions: {},
1164 has_next: None,
1165 subscribed: None,
1166 created_at: None,
1167 incremental: [],
1168 },
1169 }
1170 "#);
1171 }
1172
1173 #[tokio::test]
1174 async fn test_handle_responses_status() {
1175 let selection = JSONSelection::parse("$status").unwrap();
1176 let connector = Arc::new(Connector {
1177 spec: ConnectSpec::V0_1,
1178 schema_subtypes_map: Default::default(),
1179 id: ConnectId::new(
1180 "subgraph_name".into(),
1181 None,
1182 name!(Query),
1183 name!(hello),
1184 None,
1185 0,
1186 ),
1187 transport: Some(HttpJsonTransport {
1188 source_template: "http://localhost/api".parse().ok(),
1189 connect_template: "/path".parse().unwrap(),
1190 ..Default::default()
1191 }),
1192 selection: selection.clone(),
1193 entity_resolver: None,
1194 config: Default::default(),
1195 max_requests: None,
1196 batch_settings: None,
1197 request_headers: Default::default(),
1198 response_headers: Default::default(),
1199 request_variable_keys: Default::default(),
1200 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1201 error_settings: Default::default(),
1202 label: "test label".into(),
1203 });
1204
1205 let response1: http::Response<RouterBody> = http::Response::builder()
1206 .status(201)
1207 .body(router::body::from_bytes(r#"{}"#))
1208 .unwrap();
1209 let response_key1 = ResponseKey::RootField {
1210 name: "hello".to_string(),
1211 inputs: Default::default(),
1212 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1213 };
1214
1215 let supergraph_request = Arc::new(
1216 http::Request::builder()
1217 .body(graphql::Request::builder().build())
1218 .unwrap(),
1219 );
1220
1221 let res = super::aggregate_responses(
1222 vec![
1223 process_response(
1224 Ok(response1),
1225 response_key1,
1226 connector,
1227 &Context::default(),
1228 (None, Default::default()),
1229 None,
1230 supergraph_request,
1231 Default::default(),
1232 )
1233 .await
1234 .mapped_response,
1235 ],
1236 Context::new(),
1237 )
1238 .unwrap();
1239
1240 assert_debug_snapshot!(res.response, @r#"
1241 Response {
1242 status: 200,
1243 version: HTTP/1.1,
1244 headers: {},
1245 body: Response {
1246 label: None,
1247 data: Some(
1248 Object({
1249 "hello": Number(201),
1250 }),
1251 ),
1252 path: None,
1253 errors: [],
1254 extensions: {},
1255 has_next: None,
1256 subscribed: None,
1257 created_at: None,
1258 incremental: [],
1259 },
1260 }
1261 "#);
1262 }
1263
1264 #[tokio::test]
1265 async fn test_handle_response_with_is_success() {
1266 let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1267 let selection = JSONSelection::parse("$status").unwrap();
1268 let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1269 message: Default::default(),
1270 source_extensions: Default::default(),
1271 connect_extensions: Default::default(),
1272 connect_is_success: Some(is_success.clone()),
1273 };
1274 let connector = Arc::new(Connector {
1275 spec: ConnectSpec::V0_1,
1276 schema_subtypes_map: Default::default(),
1277 id: ConnectId::new(
1278 "subgraph_name".into(),
1279 None,
1280 name!(Query),
1281 name!(hello),
1282 None,
1283 0,
1284 ),
1285 transport: Some(HttpJsonTransport {
1286 source_template: "http://localhost/api".parse().ok(),
1287 connect_template: "/path".parse().unwrap(),
1288 ..Default::default()
1289 }),
1290 selection: selection.clone(),
1291 entity_resolver: None,
1292 config: Default::default(),
1293 max_requests: None,
1294 batch_settings: None,
1295 request_headers: Default::default(),
1296 response_headers: Default::default(),
1297 request_variable_keys: Default::default(),
1298 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1299 error_settings,
1300 label: Label::from("test label"),
1301 });
1302
1303 let response_fail: http::Response<RouterBody> = http::Response::builder()
1305 .status(201)
1306 .body(router::body::from_bytes(r#"{}"#))
1307 .unwrap();
1308 let response_fail_key = ResponseKey::RootField {
1309 name: "hello".to_string(),
1310 inputs: Default::default(),
1311 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1312 };
1313
1314 let response_succeed: http::Response<RouterBody> = http::Response::builder()
1316 .status(400)
1317 .body(router::body::from_bytes(r#"{}"#))
1318 .unwrap();
1319 let response_succeed_key = ResponseKey::RootField {
1320 name: "hello".to_string(),
1321 inputs: Default::default(),
1322 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1323 };
1324
1325 let supergraph_request = Arc::new(
1326 http::Request::builder()
1327 .body(graphql::Request::builder().build())
1328 .unwrap(),
1329 );
1330
1331 let res_expect_fail = super::aggregate_responses(
1333 vec![
1334 process_response(
1335 Ok(response_fail),
1336 response_fail_key,
1337 connector.clone(),
1338 &Context::default(),
1339 (None, Default::default()),
1340 None,
1341 supergraph_request.clone(),
1342 Default::default(),
1343 )
1344 .await
1345 .mapped_response,
1346 ],
1347 Context::new(),
1348 )
1349 .unwrap()
1350 .response;
1351 assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1352 assert_eq!(res_expect_fail.body().errors.len(), 1);
1353
1354 let res_expect_success = super::aggregate_responses(
1356 vec![
1357 process_response(
1358 Ok(response_succeed),
1359 response_succeed_key,
1360 connector.clone(),
1361 &Context::default(),
1362 (None, Default::default()),
1363 None,
1364 supergraph_request.clone(),
1365 Default::default(),
1366 )
1367 .await
1368 .mapped_response,
1369 ],
1370 Context::new(),
1371 )
1372 .unwrap()
1373 .response;
1374 assert!(res_expect_success.body().errors.is_empty());
1375 assert_eq!(
1376 &res_expect_success.body().data,
1377 &Some(json!({"hello": json!(400)}))
1378 );
1379 }
1380
1381 fn make_connector() -> Arc<Connector> {
1382 Arc::new(Connector {
1383 spec: ConnectSpec::V0_1,
1384 schema_subtypes_map: Default::default(),
1385 id: ConnectId::new(
1386 "subgraph_name".into(),
1387 None,
1388 name!(Query),
1389 name!(hello),
1390 None,
1391 0,
1392 ),
1393 transport: Some(HttpJsonTransport {
1394 source_template: "http://localhost/api".parse().ok(),
1395 connect_template: "/path".parse().unwrap(),
1396 ..Default::default()
1397 }),
1398 selection: JSONSelection::parse("$.data").unwrap(),
1399 entity_resolver: None,
1400 config: Default::default(),
1401 max_requests: None,
1402 batch_settings: None,
1403 request_headers: Default::default(),
1404 response_headers: Default::default(),
1405 request_variable_keys: Default::default(),
1406 response_variable_keys: Default::default(),
1407 error_settings: Default::default(),
1408 label: "test label".into(),
1409 })
1410 }
1411
1412 fn make_supergraph_request() -> Arc<http::Request<graphql::Request>> {
1413 Arc::new(
1414 http::Request::builder()
1415 .body(graphql::Request::builder().build())
1416 .unwrap(),
1417 )
1418 }
1419
1420 #[tokio::test]
1421 async fn process_response_under_size_limit() {
1422 use crate::plugins::limits::ConnectorResponseSizeLimit;
1423
1424 let ctx = Context::new();
1425 ctx.extensions()
1426 .with_lock(|e| e.insert(ConnectorResponseSizeLimit(1000)));
1427
1428 let key = ResponseKey::RootField {
1429 name: "hello".to_string(),
1430 inputs: Default::default(),
1431 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1432 };
1433 let response = http::Response::builder()
1434 .body(router::body::from_bytes(r#"{"data":"world"}"#))
1435 .unwrap();
1436
1437 let result = process_response(
1438 Ok(response),
1439 key,
1440 make_connector(),
1441 &ctx,
1442 (None, Default::default()),
1443 None,
1444 make_supergraph_request(),
1445 Default::default(),
1446 )
1447 .await;
1448
1449 let graphql_response =
1450 super::aggregate_responses(vec![result.mapped_response], Context::new())
1451 .unwrap()
1452 .response;
1453 assert!(
1454 graphql_response.body().errors.is_empty(),
1455 "expected no errors when response is under the limit"
1456 );
1457 }
1458
1459 #[tokio::test]
1460 async fn process_response_exceeds_size_limit() {
1461 use crate::plugins::limits::ConnectorResponseSizeLimit;
1462
1463 let ctx = Context::new();
1464 ctx.extensions()
1466 .with_lock(|e| e.insert(ConnectorResponseSizeLimit(5)));
1467
1468 let key = ResponseKey::RootField {
1469 name: "hello".to_string(),
1470 inputs: Default::default(),
1471 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1472 };
1473 let response = http::Response::builder()
1474 .body(router::body::from_bytes(r#"{"data":"world"}"#))
1475 .unwrap();
1476
1477 let result = process_response(
1478 Ok(response),
1479 key,
1480 make_connector(),
1481 &ctx,
1482 (None, Default::default()),
1483 None,
1484 make_supergraph_request(),
1485 Default::default(),
1486 )
1487 .await;
1488
1489 let graphql_response =
1490 super::aggregate_responses(vec![result.mapped_response], Context::new())
1491 .unwrap()
1492 .response;
1493 let errors = &graphql_response.body().errors;
1494 assert!(!errors.is_empty(), "expected an error for exceeded limit");
1495 assert!(
1496 errors[0].message.contains("exceeded limit of 5 bytes"),
1497 "unexpected error message: {}",
1498 errors[0].message
1499 );
1500 }
1501}