1use std::sync::Arc;
2
3use apollo_federation::connectors::Connector;
4use apollo_federation::connectors::ProblemLocation;
5use apollo_federation::connectors::runtime::debug::ConnectorContext;
6use apollo_federation::connectors::runtime::debug::ConnectorDebugHttpRequest;
7use apollo_federation::connectors::runtime::errors::Error;
8use apollo_federation::connectors::runtime::errors::RuntimeError;
9use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
10use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
11use apollo_federation::connectors::runtime::key::ResponseKey;
12use apollo_federation::connectors::runtime::mapping::Problem;
13use apollo_federation::connectors::runtime::responses::HandleResponseError;
14use apollo_federation::connectors::runtime::responses::MappedResponse;
15use apollo_federation::connectors::runtime::responses::RawResponse;
16use apollo_federation::connectors::runtime::responses::handle_raw_response;
17use axum::body::HttpBody;
18use encoding_rs::Encoding;
19use encoding_rs::UTF_8;
20use http::header::CONTENT_LENGTH;
21use http::header::CONTENT_TYPE;
22use mime::Mime;
23use opentelemetry::KeyValue;
24use parking_lot::Mutex;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44use crate::services::router;
45
46impl From<RuntimeError> for graphql::Error {
49 fn from(error: RuntimeError) -> Self {
50 let path: Path = (&error.path).into();
51
52 let err = graphql::Error::builder()
53 .message(&error.message)
54 .extensions(error.extensions())
55 .extension_code(error.code())
56 .path(path)
57 .build();
58
59 if let Some(subgraph_name) = &error.subgraph_name {
60 err.with_subgraph_name(subgraph_name)
61 } else {
62 err
63 }
64 }
65}
66
67pub(crate) async fn process_response<T: HttpBody>(
70 result: Result<http::Response<T>, Error>,
71 response_key: ResponseKey,
72 connector: Arc<Connector>,
73 context: &Context,
74 debug_request: (
75 Option<Box<ConnectorDebugHttpRequest>>,
76 Vec<(ProblemLocation, Problem)>,
77 ),
78 debug_context: &Option<Arc<Mutex<ConnectorContext>>>,
79 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
80) -> connector::request_service::Response {
81 let (mapped_response, result) = match result {
82 Err(error) => {
84 let raw = RawResponse::Error {
85 error: error.to_runtime_error(&connector, &response_key),
86 key: response_key,
87 };
88 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
89 (
90 raw.map_error(
91 &connector,
92 context,
93 debug_context,
94 supergraph_request.headers(),
95 ),
96 Err(error),
97 )
98 }
99 Ok(response) => {
100 let (parts, body) = response.into_parts();
101
102 let result = Ok(TransportResponse::Http(HttpResponse {
103 inner: parts.clone(),
104 }));
105
106 let raw = match deserialize_response(
110 body,
111 &parts,
112 connector.clone(),
113 context,
114 &response_key,
115 debug_context,
116 &debug_request,
117 )
118 .await
119 {
120 Ok(data) => RawResponse::Data {
121 parts,
122 data,
123 key: response_key,
124 debug_request,
125 },
126 Err(error) => RawResponse::Error {
127 error,
128 key: response_key,
129 },
130 };
131
132 let (mapped, is_success) = handle_raw_response(
133 raw,
134 &connector,
135 context,
136 debug_context,
137 supergraph_request.headers(),
138 );
139
140 if is_success {
141 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
142 } else {
143 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
144 };
145
146 (mapped, result)
147 }
148 };
149
150 if let MappedResponse::Error { ref error, .. } = mapped_response {
151 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
152 }
153
154 connector::request_service::Response {
155 context: context.clone(),
156 connector: connector.clone(),
157 transport_result: result,
158 mapped_response,
159 }
160}
161
162pub(crate) fn aggregate_responses(
163 responses: Vec<MappedResponse>,
164) -> Result<Response, HandleResponseError> {
165 let mut data = serde_json_bytes::Map::new();
166 let mut errors = Vec::new();
167 let count = responses.len();
168
169 for mapped in responses {
170 mapped.add_to_data(&mut data, &mut errors, count)?;
171 }
172
173 let data = if data.is_empty() {
174 Value::Null
175 } else {
176 Value::Object(data)
177 };
178
179 Span::current().record(
180 OTEL_STATUS_CODE,
181 if errors.is_empty() {
182 OTEL_STATUS_CODE_OK
183 } else {
184 OTEL_STATUS_CODE_ERROR
185 },
186 );
187
188 Ok(Response {
189 response: http::Response::builder()
190 .body(
191 graphql::Response::builder()
192 .data(data)
193 .errors(errors.into_iter().map(|e| e.into()).collect())
194 .build(),
195 )
196 .unwrap(),
197 })
198}
199
200async fn deserialize_response<T: HttpBody>(
204 body: T,
205 parts: &http::response::Parts,
206 connector: Arc<Connector>,
207 context: &Context,
208 response_key: &ResponseKey,
209 debug_context: &Option<Arc<Mutex<ConnectorContext>>>,
210 debug_request: &(
211 Option<Box<ConnectorDebugHttpRequest>>,
212 Vec<(ProblemLocation, Problem)>,
213 ),
214) -> Result<Value, RuntimeError> {
215 use serde_json_bytes::*;
216
217 let make_err = || {
218 let mut err = RuntimeError::new(
219 "The server returned data in an unexpected format.".to_string(),
220 response_key,
221 );
222 err.subgraph_name = Some(connector.id.subgraph_name.clone());
223 err = err.with_code("CONNECTOR_RESPONSE_INVALID");
224 err.coordinate = Some(connector.id.coordinate());
225 err = err.extension(
226 "http",
227 Value::Object(Map::from_iter([(
228 "status".into(),
229 Value::Number(parts.status.as_u16().into()),
230 )])),
231 );
232 err
233 };
234
235 let body = &router::body::into_bytes(body)
236 .await
237 .map_err(|_| make_err())?;
238
239 let log_response_level = context
240 .extensions()
241 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
242 .and_then(|event| {
243 let response = connector::request_service::Response {
254 context: context.clone(),
255 connector: connector.clone(),
256 transport_result: Ok(TransportResponse::Http(HttpResponse {
257 inner: parts.clone(),
258 })),
259 mapped_response: MappedResponse::Data {
260 data: Value::Null,
261 key: response_key.clone(),
262 problems: vec![],
263 },
264 };
265 if event.condition.evaluate_response(&response) {
266 Some(event.level)
267 } else {
268 None
269 }
270 });
271
272 if let Some(level) = log_response_level {
273 let mut attrs = Vec::with_capacity(4);
274 #[cfg(test)]
275 let headers = {
276 let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
277 .headers
278 .clone()
279 .into_iter()
280 .filter_map(|(name, val)| Some((name?.to_string(), val)))
281 .collect();
282 headers.sort_keys();
283 headers
284 };
285 #[cfg(not(test))]
286 let headers = &parts.headers;
287
288 attrs.push(KeyValue::new(
289 HTTP_RESPONSE_HEADERS,
290 opentelemetry::Value::String(format!("{:?}", headers).into()),
291 ));
292 attrs.push(KeyValue::new(
293 HTTP_RESPONSE_STATUS,
294 opentelemetry::Value::String(format!("{}", parts.status).into()),
295 ));
296 attrs.push(KeyValue::new(
297 HTTP_RESPONSE_VERSION,
298 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
299 ));
300 attrs.push(KeyValue::new(
301 HTTP_RESPONSE_BODY,
302 opentelemetry::Value::String(
303 String::from_utf8(body.clone().to_vec())
304 .unwrap_or_default()
305 .into(),
306 ),
307 ));
308
309 log_event(
310 level,
311 "connector.response",
312 attrs,
313 &format!("Response from connector {label:?}", label = connector.label),
314 );
315 }
316
317 if let Some(content_length) = parts
319 .headers
320 .get(CONTENT_LENGTH)
321 .and_then(|len| len.to_str().ok())
322 .and_then(|s| s.parse::<usize>().ok())
323 {
324 if content_length == 0 {
325 return Ok(Value::Null);
326 }
327 }
328
329 let content_type = parts
330 .headers
331 .get(CONTENT_TYPE)
332 .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
333
334 if content_type.is_none()
335 || content_type
336 .as_ref()
337 .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
338 {
339 match serde_json::from_slice::<Value>(body) {
342 Ok(json_data) => Ok(json_data),
343 Err(_) => {
344 if let Some(debug_context) = debug_context {
345 debug_context.lock().push_invalid_response(
346 debug_request.0.clone(),
347 parts,
348 body,
349 &connector.error_settings,
350 debug_request.1.clone(),
351 );
352 }
353 Err(make_err())
354 }
355 }
356 } else if content_type
357 .as_ref()
358 .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
359 {
360 let encoding = content_type
363 .as_ref()
364 .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
365 .unwrap_or(UTF_8);
366 let (decoded_body, _, had_errors) = encoding.decode(body);
367
368 if had_errors {
369 if let Some(debug_context) = debug_context {
370 debug_context.lock().push_invalid_response(
371 debug_request.0.clone(),
372 parts,
373 body,
374 &connector.error_settings,
375 debug_request.1.clone(),
376 );
377 }
378 return Err(make_err());
379 }
380
381 Ok(Value::String(decoded_body.into_owned().into()))
382 } else {
383 Ok(Value::Null)
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::sync::Arc;
391
392 use apollo_compiler::Schema;
393 use apollo_compiler::collections::IndexMap;
394 use apollo_compiler::name;
395 use apollo_federation::connectors::ConnectId;
396 use apollo_federation::connectors::ConnectSpec;
397 use apollo_federation::connectors::Connector;
398 use apollo_federation::connectors::EntityResolver;
399 use apollo_federation::connectors::HTTPMethod;
400 use apollo_federation::connectors::HttpJsonTransport;
401 use apollo_federation::connectors::JSONSelection;
402 use apollo_federation::connectors::Namespace;
403 use apollo_federation::connectors::runtime::inputs::RequestInputs;
404 use apollo_federation::connectors::runtime::key::ResponseKey;
405 use insta::assert_debug_snapshot;
406 use itertools::Itertools;
407
408 use crate::Context;
409 use crate::graphql;
410 use crate::plugins::connectors::handle_responses::process_response;
411 use crate::services::router;
412 use crate::services::router::body::RouterBody;
413
414 #[tokio::test]
415 async fn test_handle_responses_root_fields() {
416 let connector = Arc::new(Connector {
417 spec: ConnectSpec::V0_1,
418 id: ConnectId::new(
419 "subgraph_name".into(),
420 None,
421 name!(Query),
422 name!(hello),
423 None,
424 0,
425 ),
426 transport: HttpJsonTransport {
427 source_template: "http://localhost/api".parse().ok(),
428 connect_template: "/path".parse().unwrap(),
429 ..Default::default()
430 },
431 selection: JSONSelection::parse("$.data").unwrap(),
432 entity_resolver: None,
433 config: Default::default(),
434 max_requests: None,
435 batch_settings: None,
436 request_headers: Default::default(),
437 response_headers: Default::default(),
438 request_variable_keys: Default::default(),
439 response_variable_keys: Default::default(),
440 error_settings: Default::default(),
441 label: "test label".into(),
442 });
443
444 let response1: http::Response<RouterBody> = http::Response::builder()
445 .body(router::body::from_bytes(r#"{"data":"world"}"#))
446 .unwrap();
447 let response_key1 = ResponseKey::RootField {
448 name: "hello".to_string(),
449 inputs: Default::default(),
450 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
451 };
452
453 let response2 = http::Response::builder()
454 .body(router::body::from_bytes(r#"{"data":"world"}"#))
455 .unwrap();
456 let response_key2 = ResponseKey::RootField {
457 name: "hello2".to_string(),
458 inputs: Default::default(),
459 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
460 };
461
462 let supergraph_request = Arc::new(
463 http::Request::builder()
464 .body(graphql::Request::builder().build())
465 .unwrap(),
466 );
467
468 let res = super::aggregate_responses(vec![
469 process_response(
470 Ok(response1),
471 response_key1,
472 connector.clone(),
473 &Context::default(),
474 (None, Default::default()),
475 &None,
476 supergraph_request.clone(),
477 )
478 .await
479 .mapped_response,
480 process_response(
481 Ok(response2),
482 response_key2,
483 connector,
484 &Context::default(),
485 (None, Default::default()),
486 &None,
487 supergraph_request,
488 )
489 .await
490 .mapped_response,
491 ])
492 .unwrap();
493
494 assert_debug_snapshot!(res, @r###"
495 Response {
496 response: Response {
497 status: 200,
498 version: HTTP/1.1,
499 headers: {},
500 body: Response {
501 label: None,
502 data: Some(
503 Object({
504 "hello": String(
505 "world",
506 ),
507 "hello2": String(
508 "world",
509 ),
510 }),
511 ),
512 path: None,
513 errors: [],
514 extensions: {},
515 has_next: None,
516 subscribed: None,
517 created_at: None,
518 incremental: [],
519 },
520 },
521 }
522 "###);
523 }
524
525 #[tokio::test]
526 async fn test_handle_responses_entities() {
527 let connector = Arc::new(Connector {
528 spec: ConnectSpec::V0_1,
529 id: ConnectId::new(
530 "subgraph_name".into(),
531 None,
532 name!(Query),
533 name!(user),
534 None,
535 0,
536 ),
537 transport: HttpJsonTransport {
538 source_template: "http://localhost/api".parse().ok(),
539 connect_template: "/path".parse().unwrap(),
540 ..Default::default()
541 },
542 selection: JSONSelection::parse("$.data { id }").unwrap(),
543 entity_resolver: Some(EntityResolver::Explicit),
544 config: Default::default(),
545 max_requests: None,
546 batch_settings: None,
547 request_headers: Default::default(),
548 response_headers: Default::default(),
549 request_variable_keys: Default::default(),
550 response_variable_keys: Default::default(),
551 error_settings: Default::default(),
552 label: "test label".into(),
553 });
554
555 let response1: http::Response<RouterBody> = http::Response::builder()
556 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
557 .unwrap();
558 let response_key1 = ResponseKey::Entity {
559 index: 0,
560 inputs: Default::default(),
561 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
562 };
563
564 let response2 = http::Response::builder()
565 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
566 .unwrap();
567 let response_key2 = ResponseKey::Entity {
568 index: 1,
569 inputs: Default::default(),
570 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
571 };
572
573 let supergraph_request = Arc::new(
574 http::Request::builder()
575 .body(graphql::Request::builder().build())
576 .unwrap(),
577 );
578
579 let res = super::aggregate_responses(vec![
580 process_response(
581 Ok(response1),
582 response_key1,
583 connector.clone(),
584 &Context::default(),
585 (None, Default::default()),
586 &None,
587 supergraph_request.clone(),
588 )
589 .await
590 .mapped_response,
591 process_response(
592 Ok(response2),
593 response_key2,
594 connector,
595 &Context::default(),
596 (None, Default::default()),
597 &None,
598 supergraph_request,
599 )
600 .await
601 .mapped_response,
602 ])
603 .unwrap();
604
605 assert_debug_snapshot!(res, @r###"
606 Response {
607 response: Response {
608 status: 200,
609 version: HTTP/1.1,
610 headers: {},
611 body: Response {
612 label: None,
613 data: Some(
614 Object({
615 "_entities": Array([
616 Object({
617 "id": String(
618 "1",
619 ),
620 }),
621 Object({
622 "id": String(
623 "2",
624 ),
625 }),
626 ]),
627 }),
628 ),
629 path: None,
630 errors: [],
631 extensions: {},
632 has_next: None,
633 subscribed: None,
634 created_at: None,
635 incremental: [],
636 },
637 },
638 }
639 "###);
640 }
641
642 #[tokio::test]
643 async fn test_handle_responses_batch() {
644 let connector = Arc::new(Connector {
645 spec: ConnectSpec::V0_2,
646 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
647 transport: HttpJsonTransport {
648 source_template: "http://localhost/api".parse().ok(),
649 connect_template: "/path".parse().unwrap(),
650 method: HTTPMethod::Post,
651 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
652 ..Default::default()
653 },
654 selection: JSONSelection::parse("$.data { id name }").unwrap(),
655 entity_resolver: Some(EntityResolver::TypeBatch),
656 config: Default::default(),
657 max_requests: None,
658 batch_settings: None,
659 request_headers: Default::default(),
660 response_headers: Default::default(),
661 request_variable_keys: Default::default(),
662 response_variable_keys: Default::default(),
663 error_settings: Default::default(),
664 label: "test label".into(),
665 });
666
667 let keys = connector
668 .resolvable_key(
669 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
670 .unwrap(),
671 )
672 .unwrap()
673 .unwrap();
674
675 let response1: http::Response<RouterBody> = http::Response::builder()
676 .body(router::body::from_bytes(
678 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
679 ))
680 .unwrap();
681
682 let mut inputs: RequestInputs = RequestInputs::default();
683 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
684 inputs.batch = representations
685 .as_array()
686 .unwrap()
687 .iter()
688 .cloned()
689 .map(|v| v.as_object().unwrap().clone())
690 .collect_vec();
691
692 let response_key1 = ResponseKey::BatchEntity {
693 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
694 keys,
695 inputs,
696 };
697
698 let supergraph_request = Arc::new(
699 http::Request::builder()
700 .body(graphql::Request::builder().build())
701 .unwrap(),
702 );
703
704 let res = super::aggregate_responses(vec![
705 process_response(
706 Ok(response1),
707 response_key1,
708 connector.clone(),
709 &Context::default(),
710 (None, Default::default()),
711 &None,
712 supergraph_request,
713 )
714 .await
715 .mapped_response,
716 ])
717 .unwrap();
718
719 assert_debug_snapshot!(res, @r#"
720 Response {
721 response: Response {
722 status: 200,
723 version: HTTP/1.1,
724 headers: {},
725 body: Response {
726 label: None,
727 data: Some(
728 Object({
729 "_entities": Array([
730 Object({
731 "id": String(
732 "1",
733 ),
734 "name": String(
735 "A",
736 ),
737 }),
738 Object({
739 "id": String(
740 "2",
741 ),
742 "name": String(
743 "B",
744 ),
745 }),
746 ]),
747 }),
748 ),
749 path: None,
750 errors: [],
751 extensions: {},
752 has_next: None,
753 subscribed: None,
754 created_at: None,
755 incremental: [],
756 },
757 },
758 }
759 "#);
760 }
761
762 #[tokio::test]
763 async fn test_handle_responses_entity_field() {
764 let connector = Arc::new(Connector {
765 spec: ConnectSpec::V0_1,
766 id: ConnectId::new(
767 "subgraph_name".into(),
768 None,
769 name!(User),
770 name!(field),
771 None,
772 0,
773 ),
774 transport: HttpJsonTransport {
775 source_template: "http://localhost/api".parse().ok(),
776 connect_template: "/path".parse().unwrap(),
777 ..Default::default()
778 },
779 selection: JSONSelection::parse("$.data").unwrap(),
780 entity_resolver: Some(EntityResolver::Implicit),
781 config: Default::default(),
782 max_requests: None,
783 batch_settings: None,
784 request_headers: Default::default(),
785 response_headers: Default::default(),
786 request_variable_keys: Default::default(),
787 response_variable_keys: Default::default(),
788 error_settings: Default::default(),
789 label: "test label".into(),
790 });
791
792 let response1: http::Response<RouterBody> = http::Response::builder()
793 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
794 .unwrap();
795 let response_key1 = ResponseKey::EntityField {
796 index: 0,
797 inputs: Default::default(),
798 field_name: "field".to_string(),
799 typename: Some(name!("User")),
800 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
801 };
802
803 let response2 = http::Response::builder()
804 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
805 .unwrap();
806 let response_key2 = ResponseKey::EntityField {
807 index: 1,
808 inputs: Default::default(),
809 field_name: "field".to_string(),
810 typename: Some(name!("User")),
811 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
812 };
813
814 let supergraph_request = Arc::new(
815 http::Request::builder()
816 .body(graphql::Request::builder().build())
817 .unwrap(),
818 );
819
820 let res = super::aggregate_responses(vec![
821 process_response(
822 Ok(response1),
823 response_key1,
824 connector.clone(),
825 &Context::default(),
826 (None, Default::default()),
827 &None,
828 supergraph_request.clone(),
829 )
830 .await
831 .mapped_response,
832 process_response(
833 Ok(response2),
834 response_key2,
835 connector,
836 &Context::default(),
837 (None, Default::default()),
838 &None,
839 supergraph_request,
840 )
841 .await
842 .mapped_response,
843 ])
844 .unwrap();
845
846 assert_debug_snapshot!(res, @r###"
847 Response {
848 response: Response {
849 status: 200,
850 version: HTTP/1.1,
851 headers: {},
852 body: Response {
853 label: None,
854 data: Some(
855 Object({
856 "_entities": Array([
857 Object({
858 "__typename": String(
859 "User",
860 ),
861 "field": String(
862 "value1",
863 ),
864 }),
865 Object({
866 "__typename": String(
867 "User",
868 ),
869 "field": String(
870 "value2",
871 ),
872 }),
873 ]),
874 }),
875 ),
876 path: None,
877 errors: [],
878 extensions: {},
879 has_next: None,
880 subscribed: None,
881 created_at: None,
882 incremental: [],
883 },
884 },
885 }
886 "###);
887 }
888
889 #[tokio::test]
890 async fn test_handle_responses_errors() {
891 let connector = Arc::new(Connector {
892 spec: ConnectSpec::V0_1,
893 id: ConnectId::new(
894 "subgraph_name".into(),
895 None,
896 name!(Query),
897 name!(user),
898 None,
899 0,
900 ),
901 transport: HttpJsonTransport {
902 source_template: "http://localhost/api".parse().ok(),
903 connect_template: "/path".parse().unwrap(),
904 ..Default::default()
905 },
906 selection: JSONSelection::parse("$.data").unwrap(),
907 entity_resolver: Some(EntityResolver::Explicit),
908 config: Default::default(),
909 max_requests: None,
910 batch_settings: None,
911 request_headers: Default::default(),
912 response_headers: Default::default(),
913 request_variable_keys: Default::default(),
914 response_variable_keys: Default::default(),
915 error_settings: Default::default(),
916 label: "test label".into(),
917 });
918
919 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
920 .body(router::body::from_bytes(r#"plain text"#))
921 .unwrap();
922 let response_key_plaintext = ResponseKey::Entity {
923 index: 0,
924 inputs: Default::default(),
925 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
926 };
927
928 let response1: http::Response<RouterBody> = http::Response::builder()
929 .status(404)
930 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
931 .unwrap();
932 let response_key1 = ResponseKey::Entity {
933 index: 1,
934 inputs: Default::default(),
935 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
936 };
937
938 let response2 = http::Response::builder()
939 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
940 .unwrap();
941 let response_key2 = ResponseKey::Entity {
942 index: 2,
943 inputs: Default::default(),
944 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
945 };
946
947 let response3: http::Response<RouterBody> = http::Response::builder()
948 .status(500)
949 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
950 .unwrap();
951 let response_key3 = ResponseKey::Entity {
952 index: 3,
953 inputs: Default::default(),
954 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
955 };
956
957 let supergraph_request = Arc::new(
958 http::Request::builder()
959 .body(graphql::Request::builder().build())
960 .unwrap(),
961 );
962
963 let mut res = super::aggregate_responses(vec![
964 process_response(
965 Ok(response_plaintext),
966 response_key_plaintext,
967 connector.clone(),
968 &Context::default(),
969 (None, Default::default()),
970 &None,
971 supergraph_request.clone(),
972 )
973 .await
974 .mapped_response,
975 process_response(
976 Ok(response1),
977 response_key1,
978 connector.clone(),
979 &Context::default(),
980 (None, Default::default()),
981 &None,
982 supergraph_request.clone(),
983 )
984 .await
985 .mapped_response,
986 process_response(
987 Ok(response2),
988 response_key2,
989 connector.clone(),
990 &Context::default(),
991 (None, Default::default()),
992 &None,
993 supergraph_request.clone(),
994 )
995 .await
996 .mapped_response,
997 process_response(
998 Ok(response3),
999 response_key3,
1000 connector,
1001 &Context::default(),
1002 (None, Default::default()),
1003 &None,
1004 supergraph_request,
1005 )
1006 .await
1007 .mapped_response,
1008 ])
1009 .unwrap();
1010
1011 let body = res.response.body_mut();
1015 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1016
1017 assert_debug_snapshot!(res, @r#"
1018 Response {
1019 response: Response {
1020 status: 200,
1021 version: HTTP/1.1,
1022 headers: {},
1023 body: Response {
1024 label: None,
1025 data: Some(
1026 Object({
1027 "_entities": Array([
1028 Null,
1029 Null,
1030 Object({
1031 "id": String(
1032 "2",
1033 ),
1034 }),
1035 Null,
1036 ]),
1037 }),
1038 ),
1039 path: None,
1040 errors: [
1041 Error {
1042 message: "The server returned data in an unexpected format.",
1043 locations: [],
1044 path: Some(
1045 Path(
1046 [
1047 Key(
1048 "_entities",
1049 None,
1050 ),
1051 Index(
1052 0,
1053 ),
1054 ],
1055 ),
1056 ),
1057 extensions: {
1058 "code": String(
1059 "CONNECTOR_RESPONSE_INVALID",
1060 ),
1061 "service": String(
1062 "subgraph_name",
1063 ),
1064 "connector": Object({
1065 "coordinate": String(
1066 "subgraph_name:Query.user@connect[0]",
1067 ),
1068 }),
1069 "http": Object({
1070 "status": Number(200),
1071 }),
1072 "apollo.private.subgraph.name": String(
1073 "subgraph_name",
1074 ),
1075 },
1076 apollo_id: 00000000-0000-0000-0000-000000000000,
1077 },
1078 Error {
1079 message: "Request failed",
1080 locations: [],
1081 path: Some(
1082 Path(
1083 [
1084 Key(
1085 "_entities",
1086 None,
1087 ),
1088 Index(
1089 1,
1090 ),
1091 ],
1092 ),
1093 ),
1094 extensions: {
1095 "code": String(
1096 "CONNECTOR_FETCH",
1097 ),
1098 "service": String(
1099 "subgraph_name",
1100 ),
1101 "connector": Object({
1102 "coordinate": String(
1103 "subgraph_name:Query.user@connect[0]",
1104 ),
1105 }),
1106 "http": Object({
1107 "status": Number(404),
1108 }),
1109 "apollo.private.subgraph.name": String(
1110 "subgraph_name",
1111 ),
1112 },
1113 apollo_id: 00000000-0000-0000-0000-000000000000,
1114 },
1115 Error {
1116 message: "Request failed",
1117 locations: [],
1118 path: Some(
1119 Path(
1120 [
1121 Key(
1122 "_entities",
1123 None,
1124 ),
1125 Index(
1126 3,
1127 ),
1128 ],
1129 ),
1130 ),
1131 extensions: {
1132 "code": String(
1133 "CONNECTOR_FETCH",
1134 ),
1135 "service": String(
1136 "subgraph_name",
1137 ),
1138 "connector": Object({
1139 "coordinate": String(
1140 "subgraph_name:Query.user@connect[0]",
1141 ),
1142 }),
1143 "http": Object({
1144 "status": Number(500),
1145 }),
1146 "apollo.private.subgraph.name": String(
1147 "subgraph_name",
1148 ),
1149 },
1150 apollo_id: 00000000-0000-0000-0000-000000000000,
1151 },
1152 ],
1153 extensions: {},
1154 has_next: None,
1155 subscribed: None,
1156 created_at: None,
1157 incremental: [],
1158 },
1159 },
1160 }
1161 "#);
1162 }
1163
1164 #[tokio::test]
1165 async fn test_handle_responses_status() {
1166 let selection = JSONSelection::parse("$status").unwrap();
1167 let connector = Arc::new(Connector {
1168 spec: ConnectSpec::V0_1,
1169 id: ConnectId::new(
1170 "subgraph_name".into(),
1171 None,
1172 name!(Query),
1173 name!(hello),
1174 None,
1175 0,
1176 ),
1177 transport: HttpJsonTransport {
1178 source_template: "http://localhost/api".parse().ok(),
1179 connect_template: "/path".parse().unwrap(),
1180 ..Default::default()
1181 },
1182 selection: selection.clone(),
1183 entity_resolver: None,
1184 config: Default::default(),
1185 max_requests: None,
1186 batch_settings: None,
1187 request_headers: Default::default(),
1188 response_headers: Default::default(),
1189 request_variable_keys: Default::default(),
1190 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1191 error_settings: Default::default(),
1192 label: "test label".into(),
1193 });
1194
1195 let response1: http::Response<RouterBody> = http::Response::builder()
1196 .status(201)
1197 .body(router::body::from_bytes(r#"{}"#))
1198 .unwrap();
1199 let response_key1 = ResponseKey::RootField {
1200 name: "hello".to_string(),
1201 inputs: Default::default(),
1202 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1203 };
1204
1205 let supergraph_request = Arc::new(
1206 http::Request::builder()
1207 .body(graphql::Request::builder().build())
1208 .unwrap(),
1209 );
1210
1211 let res = super::aggregate_responses(vec![
1212 process_response(
1213 Ok(response1),
1214 response_key1,
1215 connector,
1216 &Context::default(),
1217 (None, Default::default()),
1218 &None,
1219 supergraph_request,
1220 )
1221 .await
1222 .mapped_response,
1223 ])
1224 .unwrap();
1225
1226 assert_debug_snapshot!(res, @r###"
1227 Response {
1228 response: Response {
1229 status: 200,
1230 version: HTTP/1.1,
1231 headers: {},
1232 body: Response {
1233 label: None,
1234 data: Some(
1235 Object({
1236 "hello": Number(201),
1237 }),
1238 ),
1239 path: None,
1240 errors: [],
1241 extensions: {},
1242 has_next: None,
1243 subscribed: None,
1244 created_at: None,
1245 incremental: [],
1246 },
1247 },
1248 }
1249 "###);
1250 }
1251}