1use std::sync::Arc;
2
3use apollo_federation::connectors::Connector;
4use apollo_federation::connectors::runtime::debug::ConnectorContext;
5use apollo_federation::connectors::runtime::debug::DebugRequest;
6use apollo_federation::connectors::runtime::debug::SelectionData;
7use apollo_federation::connectors::runtime::errors::Error;
8use apollo_federation::connectors::runtime::errors::RuntimeError;
9use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
10use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
11use apollo_federation::connectors::runtime::key::ResponseKey;
12use apollo_federation::connectors::runtime::mapping::Problem;
13use apollo_federation::connectors::runtime::responses::HandleResponseError;
14use apollo_federation::connectors::runtime::responses::MappedResponse;
15use apollo_federation::connectors::runtime::responses::deserialize_response;
16use apollo_federation::connectors::runtime::responses::handle_raw_response;
17use axum::body::HttpBody;
18use http::response::Parts;
19use http_body_util::BodyExt;
20use opentelemetry::KeyValue;
21use parking_lot::Mutex;
22use serde_json_bytes::Map;
23use serde_json_bytes::Value;
24use tracing::Span;
25
26use crate::Context;
27use crate::graphql;
28use crate::json_ext::Path;
29use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
30use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
33use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
34use crate::plugins::telemetry::config_new::events::log_event;
35use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
36use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
38use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
39use crate::services::connect::Response;
40use crate::services::connector;
41use crate::services::fetch::AddSubgraphNameExt;
42
43impl From<RuntimeError> for graphql::Error {
46 fn from(error: RuntimeError) -> Self {
47 let path: Path = (&error.path).into();
48
49 let err = graphql::Error::builder()
50 .message(&error.message)
51 .extensions(error.extensions())
52 .extension_code(error.code())
53 .path(path)
54 .build();
55
56 if let Some(subgraph_name) = &error.subgraph_name {
57 err.with_subgraph_name(subgraph_name)
58 } else {
59 err
60 }
61 }
62}
63
64pub(crate) async fn process_response<T: HttpBody>(
67 result: Result<http::Response<T>, Error>,
68 response_key: ResponseKey,
69 connector: Arc<Connector>,
70 context: &Context,
71 debug_request: DebugRequest,
72 debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
73 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
74) -> connector::request_service::Response {
75 let (mapped_response, result) = match result {
76 Err(error) => {
78 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
79 (
80 MappedResponse::Error {
81 error: error.to_runtime_error(&connector, &response_key),
82 key: response_key,
83 problems: Vec::new(),
84 },
85 Err(error),
86 )
87 }
88 Ok(response) => {
89 let (parts, body) = response.into_parts();
90
91 let result = Ok(TransportResponse::Http(HttpResponse {
92 inner: parts.clone(),
93 }));
94
95 let make_err = || {
96 let mut err = RuntimeError::new(
97 "The server returned data in an unexpected format.".to_string(),
98 &response_key,
99 );
100 err.subgraph_name = Some(connector.id.subgraph_name.clone());
101 err = err.with_code("CONNECTOR_RESPONSE_INVALID");
102 err.coordinate = Some(connector.id.coordinate());
103 err = err.extension(
104 "http",
105 Value::Object(Map::from_iter([(
106 "status".into(),
107 Value::Number(parts.status.as_u16().into()),
108 )])),
109 );
110 err
111 };
112
113 let deserialized_body = body
114 .collect()
115 .await
116 .map_err(|_| ())
117 .and_then(|body| {
118 let body = body.to_bytes();
119 let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
120 if let Some(debug_context) = debug_context {
121 debug_context.lock().push_invalid_response(
122 debug_request.0.clone(),
123 &parts,
124 &body,
125 &connector.error_settings,
126 debug_request.1.clone(),
127 );
128 }
129 });
130 log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
131 raw
132 })
133 .map_err(|()| make_err());
134
135 let mapped = match &deserialized_body {
139 Err(error) => MappedResponse::Error {
140 error: error.clone(),
141 key: response_key,
142 problems: Vec::new(),
143 },
144 Ok(data) => handle_raw_response(
145 data,
146 &parts,
147 response_key,
148 &connector,
149 context,
150 supergraph_request.headers(),
151 ),
152 };
153
154 if let Some(debug) = debug_context {
155 let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
156 debug_problems.extend(debug_request.1);
157
158 let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
159 Some(SelectionData {
160 source: connector.selection.to_string(),
161 transformed: key.selection().to_string(),
162 result: Some(data.clone()),
163 })
164 } else {
165 None
166 };
167
168 debug.lock().push_response(
169 debug_request.0,
170 &parts,
171 deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
172 selection_data,
173 &connector.error_settings,
174 debug_problems,
175 );
176 }
177 if matches!(mapped, MappedResponse::Data { .. }) {
178 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
179 } else {
180 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
181 }
182
183 (mapped, result)
184 }
185 };
186
187 if let MappedResponse::Error { ref error, .. } = mapped_response {
188 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
189 }
190
191 connector::request_service::Response {
192 transport_result: result,
193 mapped_response,
194 }
195}
196
197pub(crate) fn aggregate_responses(
198 responses: Vec<MappedResponse>,
199) -> Result<Response, HandleResponseError> {
200 let mut data = serde_json_bytes::Map::new();
201 let mut errors = Vec::new();
202 let count = responses.len();
203
204 for mapped in responses {
205 mapped.add_to_data(&mut data, &mut errors, count)?;
206 }
207
208 let data = if data.is_empty() {
209 Value::Null
210 } else {
211 Value::Object(data)
212 };
213
214 Span::current().record(
215 OTEL_STATUS_CODE,
216 if errors.is_empty() {
217 OTEL_STATUS_CODE_OK
218 } else {
219 OTEL_STATUS_CODE_ERROR
220 },
221 );
222
223 Ok(Response {
224 response: http::Response::builder()
225 .body(
226 graphql::Response::builder()
227 .data(data)
228 .errors(errors.into_iter().map(|e| e.into()).collect())
229 .build(),
230 )
231 .unwrap(),
232 })
233}
234
235fn log_connectors_event(
236 context: &Context,
237 body: &[u8],
238 parts: &Parts,
239 response_key: ResponseKey,
240 connector: &Connector,
241) {
242 let log_response_level = context
243 .extensions()
244 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
245 .and_then(|event| {
246 let response = connector::request_service::Response {
258 transport_result: Ok(TransportResponse::Http(HttpResponse {
259 inner: parts.clone(),
260 })),
261 mapped_response: MappedResponse::Data {
262 data: Value::Null,
263 key: response_key,
264 problems: vec![],
265 },
266 };
267 if event.condition.evaluate_response(&response) {
268 Some(event.level)
269 } else {
270 None
271 }
272 });
273
274 if let Some(level) = log_response_level {
275 let mut attrs = Vec::with_capacity(4);
276 #[cfg(test)]
277 let headers = {
278 let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
279 .headers
280 .iter()
281 .map(|(name, val)| (name.to_string(), val.clone()))
282 .collect();
283 headers.sort_keys();
284 headers
285 };
286 #[cfg(not(test))]
287 let headers = &parts.headers;
288
289 attrs.push(KeyValue::new(
290 HTTP_RESPONSE_HEADERS,
291 opentelemetry::Value::String(format!("{headers:?}").into()),
292 ));
293 attrs.push(KeyValue::new(
294 HTTP_RESPONSE_STATUS,
295 opentelemetry::Value::String(format!("{}", parts.status).into()),
296 ));
297 attrs.push(KeyValue::new(
298 HTTP_RESPONSE_VERSION,
299 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
300 ));
301 attrs.push(KeyValue::new(
302 HTTP_RESPONSE_BODY,
303 opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
304 ));
305
306 log_event(
307 level,
308 "connector.response",
309 attrs,
310 &format!("Response from connector {label:?}", label = connector.label),
311 );
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use std::sync::Arc;
318
319 use apollo_compiler::Schema;
320 use apollo_compiler::collections::IndexMap;
321 use apollo_compiler::name;
322 use apollo_compiler::response::JsonValue;
323 use apollo_federation::connectors::ConnectId;
324 use apollo_federation::connectors::ConnectSpec;
325 use apollo_federation::connectors::Connector;
326 use apollo_federation::connectors::ConnectorErrorsSettings;
327 use apollo_federation::connectors::EntityResolver;
328 use apollo_federation::connectors::HTTPMethod;
329 use apollo_federation::connectors::HttpJsonTransport;
330 use apollo_federation::connectors::JSONSelection;
331 use apollo_federation::connectors::Label;
332 use apollo_federation::connectors::Namespace;
333 use apollo_federation::connectors::runtime::inputs::RequestInputs;
334 use apollo_federation::connectors::runtime::key::ResponseKey;
335 use insta::assert_debug_snapshot;
336 use itertools::Itertools;
337 use serde_json_bytes::json;
338
339 use crate::Context;
340 use crate::graphql;
341 use crate::plugins::connectors::handle_responses::process_response;
342 use crate::services::router;
343 use crate::services::router::body::RouterBody;
344
345 #[tokio::test]
346 async fn test_handle_responses_root_fields() {
347 let connector = Arc::new(Connector {
348 spec: ConnectSpec::V0_1,
349 id: ConnectId::new(
350 "subgraph_name".into(),
351 None,
352 name!(Query),
353 name!(hello),
354 None,
355 0,
356 ),
357 transport: HttpJsonTransport {
358 source_template: "http://localhost/api".parse().ok(),
359 connect_template: "/path".parse().unwrap(),
360 ..Default::default()
361 },
362 selection: JSONSelection::parse("$.data").unwrap(),
363 entity_resolver: None,
364 config: Default::default(),
365 max_requests: None,
366 batch_settings: None,
367 request_headers: Default::default(),
368 response_headers: Default::default(),
369 request_variable_keys: Default::default(),
370 response_variable_keys: Default::default(),
371 error_settings: Default::default(),
372 label: "test label".into(),
373 });
374
375 let response1: http::Response<RouterBody> = http::Response::builder()
376 .body(router::body::from_bytes(r#"{"data":"world"}"#))
377 .unwrap();
378 let response_key1 = ResponseKey::RootField {
379 name: "hello".to_string(),
380 inputs: Default::default(),
381 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
382 };
383
384 let response2 = http::Response::builder()
385 .body(router::body::from_bytes(r#"{"data":"world"}"#))
386 .unwrap();
387 let response_key2 = ResponseKey::RootField {
388 name: "hello2".to_string(),
389 inputs: Default::default(),
390 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
391 };
392
393 let supergraph_request = Arc::new(
394 http::Request::builder()
395 .body(graphql::Request::builder().build())
396 .unwrap(),
397 );
398
399 let res = super::aggregate_responses(vec![
400 process_response(
401 Ok(response1),
402 response_key1,
403 connector.clone(),
404 &Context::default(),
405 (None, Default::default()),
406 None,
407 supergraph_request.clone(),
408 )
409 .await
410 .mapped_response,
411 process_response(
412 Ok(response2),
413 response_key2,
414 connector,
415 &Context::default(),
416 (None, Default::default()),
417 None,
418 supergraph_request,
419 )
420 .await
421 .mapped_response,
422 ])
423 .unwrap();
424
425 assert_debug_snapshot!(res, @r###"
426 Response {
427 response: Response {
428 status: 200,
429 version: HTTP/1.1,
430 headers: {},
431 body: Response {
432 label: None,
433 data: Some(
434 Object({
435 "hello": String(
436 "world",
437 ),
438 "hello2": String(
439 "world",
440 ),
441 }),
442 ),
443 path: None,
444 errors: [],
445 extensions: {},
446 has_next: None,
447 subscribed: None,
448 created_at: None,
449 incremental: [],
450 },
451 },
452 }
453 "###);
454 }
455
456 #[tokio::test]
457 async fn test_handle_responses_entities() {
458 let connector = Arc::new(Connector {
459 spec: ConnectSpec::V0_1,
460 id: ConnectId::new(
461 "subgraph_name".into(),
462 None,
463 name!(Query),
464 name!(user),
465 None,
466 0,
467 ),
468 transport: HttpJsonTransport {
469 source_template: "http://localhost/api".parse().ok(),
470 connect_template: "/path".parse().unwrap(),
471 ..Default::default()
472 },
473 selection: JSONSelection::parse("$.data { id }").unwrap(),
474 entity_resolver: Some(EntityResolver::Explicit),
475 config: Default::default(),
476 max_requests: None,
477 batch_settings: None,
478 request_headers: Default::default(),
479 response_headers: Default::default(),
480 request_variable_keys: Default::default(),
481 response_variable_keys: Default::default(),
482 error_settings: Default::default(),
483 label: "test label".into(),
484 });
485
486 let response1: http::Response<RouterBody> = http::Response::builder()
487 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
488 .unwrap();
489 let response_key1 = ResponseKey::Entity {
490 index: 0,
491 inputs: Default::default(),
492 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
493 };
494
495 let response2 = http::Response::builder()
496 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
497 .unwrap();
498 let response_key2 = ResponseKey::Entity {
499 index: 1,
500 inputs: Default::default(),
501 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
502 };
503
504 let supergraph_request = Arc::new(
505 http::Request::builder()
506 .body(graphql::Request::builder().build())
507 .unwrap(),
508 );
509
510 let res = super::aggregate_responses(vec![
511 process_response(
512 Ok(response1),
513 response_key1,
514 connector.clone(),
515 &Context::default(),
516 (None, Default::default()),
517 None,
518 supergraph_request.clone(),
519 )
520 .await
521 .mapped_response,
522 process_response(
523 Ok(response2),
524 response_key2,
525 connector,
526 &Context::default(),
527 (None, Default::default()),
528 None,
529 supergraph_request,
530 )
531 .await
532 .mapped_response,
533 ])
534 .unwrap();
535
536 assert_debug_snapshot!(res, @r###"
537 Response {
538 response: Response {
539 status: 200,
540 version: HTTP/1.1,
541 headers: {},
542 body: Response {
543 label: None,
544 data: Some(
545 Object({
546 "_entities": Array([
547 Object({
548 "id": String(
549 "1",
550 ),
551 }),
552 Object({
553 "id": String(
554 "2",
555 ),
556 }),
557 ]),
558 }),
559 ),
560 path: None,
561 errors: [],
562 extensions: {},
563 has_next: None,
564 subscribed: None,
565 created_at: None,
566 incremental: [],
567 },
568 },
569 }
570 "###);
571 }
572
573 #[tokio::test]
574 async fn test_handle_responses_batch() {
575 let connector = Arc::new(Connector {
576 spec: ConnectSpec::V0_2,
577 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
578 transport: HttpJsonTransport {
579 source_template: "http://localhost/api".parse().ok(),
580 connect_template: "/path".parse().unwrap(),
581 method: HTTPMethod::Post,
582 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
583 ..Default::default()
584 },
585 selection: JSONSelection::parse("$.data { id name }").unwrap(),
586 entity_resolver: Some(EntityResolver::TypeBatch),
587 config: Default::default(),
588 max_requests: None,
589 batch_settings: None,
590 request_headers: Default::default(),
591 response_headers: Default::default(),
592 request_variable_keys: Default::default(),
593 response_variable_keys: Default::default(),
594 error_settings: Default::default(),
595 label: "test label".into(),
596 });
597
598 let keys = connector
599 .resolvable_key(
600 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
601 .unwrap(),
602 )
603 .unwrap()
604 .unwrap();
605
606 let response1: http::Response<RouterBody> = http::Response::builder()
607 .body(router::body::from_bytes(
609 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
610 ))
611 .unwrap();
612
613 let mut inputs: RequestInputs = RequestInputs::default();
614 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
615 inputs.batch = representations
616 .as_array()
617 .unwrap()
618 .iter()
619 .cloned()
620 .map(|v| v.as_object().unwrap().clone())
621 .collect_vec();
622
623 let response_key1 = ResponseKey::BatchEntity {
624 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
625 keys,
626 inputs,
627 };
628
629 let supergraph_request = Arc::new(
630 http::Request::builder()
631 .body(graphql::Request::builder().build())
632 .unwrap(),
633 );
634
635 let res = super::aggregate_responses(vec![
636 process_response(
637 Ok(response1),
638 response_key1,
639 connector.clone(),
640 &Context::default(),
641 (None, Default::default()),
642 None,
643 supergraph_request,
644 )
645 .await
646 .mapped_response,
647 ])
648 .unwrap();
649
650 assert_debug_snapshot!(res, @r#"
651 Response {
652 response: Response {
653 status: 200,
654 version: HTTP/1.1,
655 headers: {},
656 body: Response {
657 label: None,
658 data: Some(
659 Object({
660 "_entities": Array([
661 Object({
662 "id": String(
663 "1",
664 ),
665 "name": String(
666 "A",
667 ),
668 }),
669 Object({
670 "id": String(
671 "2",
672 ),
673 "name": String(
674 "B",
675 ),
676 }),
677 ]),
678 }),
679 ),
680 path: None,
681 errors: [],
682 extensions: {},
683 has_next: None,
684 subscribed: None,
685 created_at: None,
686 incremental: [],
687 },
688 },
689 }
690 "#);
691 }
692
693 #[tokio::test]
694 async fn test_handle_responses_entity_field() {
695 let connector = Arc::new(Connector {
696 spec: ConnectSpec::V0_1,
697 id: ConnectId::new(
698 "subgraph_name".into(),
699 None,
700 name!(User),
701 name!(field),
702 None,
703 0,
704 ),
705 transport: HttpJsonTransport {
706 source_template: "http://localhost/api".parse().ok(),
707 connect_template: "/path".parse().unwrap(),
708 ..Default::default()
709 },
710 selection: JSONSelection::parse("$.data").unwrap(),
711 entity_resolver: Some(EntityResolver::Implicit),
712 config: Default::default(),
713 max_requests: None,
714 batch_settings: None,
715 request_headers: Default::default(),
716 response_headers: Default::default(),
717 request_variable_keys: Default::default(),
718 response_variable_keys: Default::default(),
719 error_settings: Default::default(),
720 label: "test label".into(),
721 });
722
723 let response1: http::Response<RouterBody> = http::Response::builder()
724 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
725 .unwrap();
726 let response_key1 = ResponseKey::EntityField {
727 index: 0,
728 inputs: Default::default(),
729 field_name: "field".to_string(),
730 typename: Some(name!("User")),
731 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
732 };
733
734 let response2 = http::Response::builder()
735 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
736 .unwrap();
737 let response_key2 = ResponseKey::EntityField {
738 index: 1,
739 inputs: Default::default(),
740 field_name: "field".to_string(),
741 typename: Some(name!("User")),
742 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
743 };
744
745 let supergraph_request = Arc::new(
746 http::Request::builder()
747 .body(graphql::Request::builder().build())
748 .unwrap(),
749 );
750
751 let res = super::aggregate_responses(vec![
752 process_response(
753 Ok(response1),
754 response_key1,
755 connector.clone(),
756 &Context::default(),
757 (None, Default::default()),
758 None,
759 supergraph_request.clone(),
760 )
761 .await
762 .mapped_response,
763 process_response(
764 Ok(response2),
765 response_key2,
766 connector,
767 &Context::default(),
768 (None, Default::default()),
769 None,
770 supergraph_request,
771 )
772 .await
773 .mapped_response,
774 ])
775 .unwrap();
776
777 assert_debug_snapshot!(res, @r###"
778 Response {
779 response: Response {
780 status: 200,
781 version: HTTP/1.1,
782 headers: {},
783 body: Response {
784 label: None,
785 data: Some(
786 Object({
787 "_entities": Array([
788 Object({
789 "__typename": String(
790 "User",
791 ),
792 "field": String(
793 "value1",
794 ),
795 }),
796 Object({
797 "__typename": String(
798 "User",
799 ),
800 "field": String(
801 "value2",
802 ),
803 }),
804 ]),
805 }),
806 ),
807 path: None,
808 errors: [],
809 extensions: {},
810 has_next: None,
811 subscribed: None,
812 created_at: None,
813 incremental: [],
814 },
815 },
816 }
817 "###);
818 }
819
820 #[tokio::test]
821 async fn test_handle_responses_errors() {
822 let connector = Arc::new(Connector {
823 spec: ConnectSpec::V0_1,
824 id: ConnectId::new(
825 "subgraph_name".into(),
826 None,
827 name!(Query),
828 name!(user),
829 None,
830 0,
831 ),
832 transport: HttpJsonTransport {
833 source_template: "http://localhost/api".parse().ok(),
834 connect_template: "/path".parse().unwrap(),
835 ..Default::default()
836 },
837 selection: JSONSelection::parse("$.data").unwrap(),
838 entity_resolver: Some(EntityResolver::Explicit),
839 config: Default::default(),
840 max_requests: None,
841 batch_settings: None,
842 request_headers: Default::default(),
843 response_headers: Default::default(),
844 request_variable_keys: Default::default(),
845 response_variable_keys: Default::default(),
846 error_settings: Default::default(),
847 label: "test label".into(),
848 });
849
850 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
851 .body(router::body::from_bytes(r#"plain text"#))
852 .unwrap();
853 let response_key_plaintext = ResponseKey::Entity {
854 index: 0,
855 inputs: Default::default(),
856 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
857 };
858
859 let response1: http::Response<RouterBody> = http::Response::builder()
860 .status(404)
861 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
862 .unwrap();
863 let response_key1 = ResponseKey::Entity {
864 index: 1,
865 inputs: Default::default(),
866 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
867 };
868
869 let response2 = http::Response::builder()
870 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
871 .unwrap();
872 let response_key2 = ResponseKey::Entity {
873 index: 2,
874 inputs: Default::default(),
875 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
876 };
877
878 let response3: http::Response<RouterBody> = http::Response::builder()
879 .status(500)
880 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
881 .unwrap();
882 let response_key3 = ResponseKey::Entity {
883 index: 3,
884 inputs: Default::default(),
885 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
886 };
887
888 let supergraph_request = Arc::new(
889 http::Request::builder()
890 .body(graphql::Request::builder().build())
891 .unwrap(),
892 );
893
894 let mut res = super::aggregate_responses(vec![
895 process_response(
896 Ok(response_plaintext),
897 response_key_plaintext,
898 connector.clone(),
899 &Context::default(),
900 (None, Default::default()),
901 None,
902 supergraph_request.clone(),
903 )
904 .await
905 .mapped_response,
906 process_response(
907 Ok(response1),
908 response_key1,
909 connector.clone(),
910 &Context::default(),
911 (None, Default::default()),
912 None,
913 supergraph_request.clone(),
914 )
915 .await
916 .mapped_response,
917 process_response(
918 Ok(response2),
919 response_key2,
920 connector.clone(),
921 &Context::default(),
922 (None, Default::default()),
923 None,
924 supergraph_request.clone(),
925 )
926 .await
927 .mapped_response,
928 process_response(
929 Ok(response3),
930 response_key3,
931 connector,
932 &Context::default(),
933 (None, Default::default()),
934 None,
935 supergraph_request,
936 )
937 .await
938 .mapped_response,
939 ])
940 .unwrap();
941
942 let body = res.response.body_mut();
946 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
947
948 assert_debug_snapshot!(res, @r#"
949 Response {
950 response: Response {
951 status: 200,
952 version: HTTP/1.1,
953 headers: {},
954 body: Response {
955 label: None,
956 data: Some(
957 Object({
958 "_entities": Array([
959 Null,
960 Null,
961 Object({
962 "id": String(
963 "2",
964 ),
965 }),
966 Null,
967 ]),
968 }),
969 ),
970 path: None,
971 errors: [
972 Error {
973 message: "The server returned data in an unexpected format.",
974 locations: [],
975 path: Some(
976 Path(
977 [
978 Key(
979 "_entities",
980 None,
981 ),
982 Index(
983 0,
984 ),
985 ],
986 ),
987 ),
988 extensions: {
989 "code": String(
990 "CONNECTOR_RESPONSE_INVALID",
991 ),
992 "service": String(
993 "subgraph_name",
994 ),
995 "connector": Object({
996 "coordinate": String(
997 "subgraph_name:Query.user[0]",
998 ),
999 }),
1000 "http": Object({
1001 "status": Number(200),
1002 }),
1003 "apollo.private.subgraph.name": String(
1004 "subgraph_name",
1005 ),
1006 },
1007 apollo_id: 00000000-0000-0000-0000-000000000000,
1008 },
1009 Error {
1010 message: "Request failed",
1011 locations: [],
1012 path: Some(
1013 Path(
1014 [
1015 Key(
1016 "_entities",
1017 None,
1018 ),
1019 Index(
1020 1,
1021 ),
1022 ],
1023 ),
1024 ),
1025 extensions: {
1026 "code": String(
1027 "CONNECTOR_FETCH",
1028 ),
1029 "service": String(
1030 "subgraph_name",
1031 ),
1032 "connector": Object({
1033 "coordinate": String(
1034 "subgraph_name:Query.user[0]",
1035 ),
1036 }),
1037 "http": Object({
1038 "status": Number(404),
1039 }),
1040 "apollo.private.subgraph.name": String(
1041 "subgraph_name",
1042 ),
1043 },
1044 apollo_id: 00000000-0000-0000-0000-000000000000,
1045 },
1046 Error {
1047 message: "Request failed",
1048 locations: [],
1049 path: Some(
1050 Path(
1051 [
1052 Key(
1053 "_entities",
1054 None,
1055 ),
1056 Index(
1057 3,
1058 ),
1059 ],
1060 ),
1061 ),
1062 extensions: {
1063 "code": String(
1064 "CONNECTOR_FETCH",
1065 ),
1066 "service": String(
1067 "subgraph_name",
1068 ),
1069 "connector": Object({
1070 "coordinate": String(
1071 "subgraph_name:Query.user[0]",
1072 ),
1073 }),
1074 "http": Object({
1075 "status": Number(500),
1076 }),
1077 "apollo.private.subgraph.name": String(
1078 "subgraph_name",
1079 ),
1080 },
1081 apollo_id: 00000000-0000-0000-0000-000000000000,
1082 },
1083 ],
1084 extensions: {},
1085 has_next: None,
1086 subscribed: None,
1087 created_at: None,
1088 incremental: [],
1089 },
1090 },
1091 }
1092 "#);
1093 }
1094
1095 #[tokio::test]
1096 async fn test_handle_responses_status() {
1097 let selection = JSONSelection::parse("$status").unwrap();
1098 let connector = Arc::new(Connector {
1099 spec: ConnectSpec::V0_1,
1100 id: ConnectId::new(
1101 "subgraph_name".into(),
1102 None,
1103 name!(Query),
1104 name!(hello),
1105 None,
1106 0,
1107 ),
1108 transport: HttpJsonTransport {
1109 source_template: "http://localhost/api".parse().ok(),
1110 connect_template: "/path".parse().unwrap(),
1111 ..Default::default()
1112 },
1113 selection: selection.clone(),
1114 entity_resolver: None,
1115 config: Default::default(),
1116 max_requests: None,
1117 batch_settings: None,
1118 request_headers: Default::default(),
1119 response_headers: Default::default(),
1120 request_variable_keys: Default::default(),
1121 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1122 error_settings: Default::default(),
1123 label: "test label".into(),
1124 });
1125
1126 let response1: http::Response<RouterBody> = http::Response::builder()
1127 .status(201)
1128 .body(router::body::from_bytes(r#"{}"#))
1129 .unwrap();
1130 let response_key1 = ResponseKey::RootField {
1131 name: "hello".to_string(),
1132 inputs: Default::default(),
1133 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1134 };
1135
1136 let supergraph_request = Arc::new(
1137 http::Request::builder()
1138 .body(graphql::Request::builder().build())
1139 .unwrap(),
1140 );
1141
1142 let res = super::aggregate_responses(vec![
1143 process_response(
1144 Ok(response1),
1145 response_key1,
1146 connector,
1147 &Context::default(),
1148 (None, Default::default()),
1149 None,
1150 supergraph_request,
1151 )
1152 .await
1153 .mapped_response,
1154 ])
1155 .unwrap();
1156
1157 assert_debug_snapshot!(res, @r###"
1158 Response {
1159 response: Response {
1160 status: 200,
1161 version: HTTP/1.1,
1162 headers: {},
1163 body: Response {
1164 label: None,
1165 data: Some(
1166 Object({
1167 "hello": Number(201),
1168 }),
1169 ),
1170 path: None,
1171 errors: [],
1172 extensions: {},
1173 has_next: None,
1174 subscribed: None,
1175 created_at: None,
1176 incremental: [],
1177 },
1178 },
1179 }
1180 "###);
1181 }
1182
1183 #[tokio::test]
1184 async fn test_handle_response_with_is_success() {
1185 let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1186 let selection = JSONSelection::parse("$status").unwrap();
1187 let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1188 message: Default::default(),
1189 source_extensions: Default::default(),
1190 connect_extensions: Default::default(),
1191 connect_is_success: Some(is_success.clone()),
1192 };
1193 let connector = Arc::new(Connector {
1194 spec: ConnectSpec::V0_1,
1195 id: ConnectId::new(
1196 "subgraph_name".into(),
1197 None,
1198 name!(Query),
1199 name!(hello),
1200 None,
1201 0,
1202 ),
1203 transport: HttpJsonTransport {
1204 source_template: "http://localhost/api".parse().ok(),
1205 connect_template: "/path".parse().unwrap(),
1206 ..Default::default()
1207 },
1208 selection: selection.clone(),
1209 entity_resolver: None,
1210 config: Default::default(),
1211 max_requests: None,
1212 batch_settings: None,
1213 request_headers: Default::default(),
1214 response_headers: Default::default(),
1215 request_variable_keys: Default::default(),
1216 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1217 error_settings,
1218 label: Label::from("test label"),
1219 });
1220
1221 let response_fail: http::Response<RouterBody> = http::Response::builder()
1223 .status(201)
1224 .body(router::body::from_bytes(r#"{}"#))
1225 .unwrap();
1226 let response_fail_key = ResponseKey::RootField {
1227 name: "hello".to_string(),
1228 inputs: Default::default(),
1229 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1230 };
1231
1232 let response_succeed: http::Response<RouterBody> = http::Response::builder()
1234 .status(400)
1235 .body(router::body::from_bytes(r#"{}"#))
1236 .unwrap();
1237 let response_succeed_key = ResponseKey::RootField {
1238 name: "hello".to_string(),
1239 inputs: Default::default(),
1240 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1241 };
1242
1243 let supergraph_request = Arc::new(
1244 http::Request::builder()
1245 .body(graphql::Request::builder().build())
1246 .unwrap(),
1247 );
1248
1249 let res_expect_fail = super::aggregate_responses(vec![
1251 process_response(
1252 Ok(response_fail),
1253 response_fail_key,
1254 connector.clone(),
1255 &Context::default(),
1256 (None, Default::default()),
1257 None,
1258 supergraph_request.clone(),
1259 )
1260 .await
1261 .mapped_response,
1262 ])
1263 .unwrap()
1264 .response;
1265 assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1266 assert_eq!(res_expect_fail.body().errors.len(), 1);
1267
1268 let res_expect_success = super::aggregate_responses(vec![
1270 process_response(
1271 Ok(response_succeed),
1272 response_succeed_key,
1273 connector.clone(),
1274 &Context::default(),
1275 (None, Default::default()),
1276 None,
1277 supergraph_request.clone(),
1278 )
1279 .await
1280 .mapped_response,
1281 ])
1282 .unwrap()
1283 .response;
1284 assert!(res_expect_success.body().errors.is_empty());
1285 assert_eq!(
1286 &res_expect_success.body().data,
1287 &Some(json!({"hello": json!(400)}))
1288 );
1289 }
1290}