1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use opentelemetry::KeyValue;
23use parking_lot::Mutex;
24use serde_json_bytes::Map;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44
45impl From<RuntimeError> for graphql::Error {
48 fn from(error: RuntimeError) -> Self {
49 let path: Path = (&error.path).into();
50
51 let err = graphql::Error::builder()
52 .message(&error.message)
53 .extensions(error.extensions())
54 .extension_code(error.code())
55 .path(path)
56 .build();
57
58 if let Some(subgraph_name) = &error.subgraph_name {
59 err.with_subgraph_name(subgraph_name)
60 } else {
61 err
62 }
63 }
64}
65
66#[allow(clippy::too_many_arguments)]
69pub(crate) async fn process_response<T: HttpBody>(
70 result: Result<http::Response<T>, Error>,
71 response_key: ResponseKey,
72 connector: Arc<Connector>,
73 context: &Context,
74 debug_request: DebugRequest,
75 debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
76 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
77 operation: Option<Arc<Valid<ExecutableDocument>>>,
78) -> connector::request_service::Response {
79 let (mapped_response, result) = match result {
80 Err(error) => {
82 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
83 (
84 MappedResponse::Error {
85 error: error.to_runtime_error(&connector, &response_key),
86 key: response_key,
87 problems: Vec::new(),
88 },
89 Err(error),
90 )
91 }
92 Ok(response) => {
93 let (parts, body) = response.into_parts();
94
95 let result = Ok(TransportResponse::Http(HttpResponse {
96 inner: parts.clone(),
97 }));
98
99 let make_err = || {
100 let mut err = RuntimeError::new(
101 "The server returned data in an unexpected format.".to_string(),
102 &response_key,
103 );
104 err.subgraph_name = Some(connector.id.subgraph_name.clone());
105 err = err.with_code("CONNECTOR_RESPONSE_INVALID");
106 err.coordinate = Some(connector.id.coordinate());
107 err = err.extension(
108 "http",
109 Value::Object(Map::from_iter([(
110 "status".into(),
111 Value::Number(parts.status.as_u16().into()),
112 )])),
113 );
114 err
115 };
116
117 let deserialized_body = body
118 .collect()
119 .await
120 .map_err(|_| ())
121 .and_then(|body| {
122 let body = body.to_bytes();
123 let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
124 if let Some(debug_context) = debug_context {
125 debug_context.lock().push_invalid_response(
126 debug_request.0.clone(),
127 &parts,
128 &body,
129 &connector.error_settings,
130 debug_request.1.clone(),
131 );
132 }
133 });
134 log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
135 raw
136 })
137 .map_err(|()| make_err());
138
139 let mapped = match &deserialized_body {
143 Err(error) => MappedResponse::Error {
144 error: error.clone(),
145 key: response_key,
146 problems: Vec::new(),
147 },
148 Ok(data) => handle_raw_response(
149 data,
150 &parts,
151 response_key,
152 &connector,
153 context,
154 supergraph_request.headers(),
155 )
156 .apply_operation(
157 operation
158 .as_ref()
159 .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
160 &connector.schema_subtypes_map,
161 ),
162 };
163
164 if let Some(debug) = debug_context {
165 let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
166 debug_problems.extend(debug_request.1);
167
168 let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
169 Some(SelectionData {
170 source: connector.selection.to_string(),
171 transformed: key.selection().to_string(),
172 result: Some(data.clone()),
173 })
174 } else {
175 None
176 };
177
178 debug.lock().push_response(
179 debug_request.0,
180 &parts,
181 deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
182 selection_data,
183 &connector.error_settings,
184 debug_problems,
185 );
186 }
187 if matches!(mapped, MappedResponse::Data { .. }) {
188 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
189 } else {
190 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
191 }
192
193 (mapped, result)
194 }
195 };
196
197 if let MappedResponse::Error { ref error, .. } = mapped_response {
198 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
199 }
200
201 connector::request_service::Response {
202 context: context.clone(),
203 transport_result: result,
204 mapped_response,
205 }
206}
207
208pub(crate) fn aggregate_responses(
209 responses: Vec<MappedResponse>,
210 _context: Context,
211) -> Result<Response, HandleResponseError> {
212 let mut data = serde_json_bytes::Map::new();
213 let mut errors = Vec::new();
214 let count = responses.len();
215
216 for mapped in responses {
217 mapped.add_to_data(&mut data, &mut errors, count)?;
218 }
219
220 let data = if data.is_empty() {
221 Value::Null
222 } else {
223 Value::Object(data)
224 };
225
226 Span::current().record(
227 OTEL_STATUS_CODE,
228 if errors.is_empty() {
229 OTEL_STATUS_CODE_OK
230 } else {
231 OTEL_STATUS_CODE_ERROR
232 },
233 );
234
235 Ok(Response {
236 response: http::Response::builder()
237 .body(
238 graphql::Response::builder()
239 .data(data)
240 .errors(errors.into_iter().map(|e| e.into()).collect())
241 .build(),
242 )
243 .unwrap(),
244 })
245}
246
247fn log_connectors_event(
248 context: &Context,
249 body: &[u8],
250 parts: &Parts,
251 response_key: ResponseKey,
252 connector: &Connector,
253) {
254 let log_response_level = context
255 .extensions()
256 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
257 .and_then(|event| {
258 let response = connector::request_service::Response {
270 context: context.clone(),
271 transport_result: Ok(TransportResponse::Http(HttpResponse {
272 inner: parts.clone(),
273 })),
274 mapped_response: MappedResponse::Data {
275 data: Value::Null,
276 key: response_key,
277 problems: vec![],
278 },
279 };
280 if event.condition.evaluate_response(&response) {
281 Some(event.level)
282 } else {
283 None
284 }
285 });
286
287 if let Some(level) = log_response_level {
288 let mut attrs = Vec::with_capacity(4);
289 #[cfg(test)]
290 let headers = {
291 let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
292 .headers
293 .iter()
294 .map(|(name, val)| (name.to_string(), val.clone()))
295 .collect();
296 headers.sort_keys();
297 headers
298 };
299 #[cfg(not(test))]
300 let headers = &parts.headers;
301
302 attrs.push(KeyValue::new(
303 HTTP_RESPONSE_HEADERS,
304 opentelemetry::Value::String(format!("{headers:?}").into()),
305 ));
306 attrs.push(KeyValue::new(
307 HTTP_RESPONSE_STATUS,
308 opentelemetry::Value::String(format!("{}", parts.status).into()),
309 ));
310 attrs.push(KeyValue::new(
311 HTTP_RESPONSE_VERSION,
312 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
313 ));
314 attrs.push(KeyValue::new(
315 HTTP_RESPONSE_BODY,
316 opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
317 ));
318
319 log_event(
320 level,
321 "connector.response",
322 attrs,
323 &format!("Response from connector {label:?}", label = connector.label),
324 );
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use std::sync::Arc;
331
332 use apollo_compiler::Schema;
333 use apollo_compiler::collections::IndexMap;
334 use apollo_compiler::name;
335 use apollo_compiler::response::JsonValue;
336 use apollo_federation::connectors::ConnectId;
337 use apollo_federation::connectors::ConnectSpec;
338 use apollo_federation::connectors::Connector;
339 use apollo_federation::connectors::ConnectorErrorsSettings;
340 use apollo_federation::connectors::EntityResolver;
341 use apollo_federation::connectors::HTTPMethod;
342 use apollo_federation::connectors::HttpJsonTransport;
343 use apollo_federation::connectors::JSONSelection;
344 use apollo_federation::connectors::Label;
345 use apollo_federation::connectors::Namespace;
346 use apollo_federation::connectors::runtime::inputs::RequestInputs;
347 use apollo_federation::connectors::runtime::key::ResponseKey;
348 use insta::assert_debug_snapshot;
349 use itertools::Itertools;
350 use serde_json_bytes::json;
351
352 use crate::Context;
353 use crate::graphql;
354 use crate::plugins::connectors::handle_responses::process_response;
355 use crate::services::router;
356 use crate::services::router::body::RouterBody;
357
358 #[tokio::test]
359 async fn test_handle_responses_root_fields() {
360 let connector = Arc::new(Connector {
361 spec: ConnectSpec::V0_1,
362 schema_subtypes_map: Default::default(),
363 id: ConnectId::new(
364 "subgraph_name".into(),
365 None,
366 name!(Query),
367 name!(hello),
368 None,
369 0,
370 ),
371 transport: HttpJsonTransport {
372 source_template: "http://localhost/api".parse().ok(),
373 connect_template: "/path".parse().unwrap(),
374 ..Default::default()
375 },
376 selection: JSONSelection::parse("$.data").unwrap(),
377 entity_resolver: None,
378 config: Default::default(),
379 max_requests: None,
380 batch_settings: None,
381 request_headers: Default::default(),
382 response_headers: Default::default(),
383 request_variable_keys: Default::default(),
384 response_variable_keys: Default::default(),
385 error_settings: Default::default(),
386 label: "test label".into(),
387 });
388
389 let response1: http::Response<RouterBody> = http::Response::builder()
390 .body(router::body::from_bytes(r#"{"data":"world"}"#))
391 .unwrap();
392 let response_key1 = ResponseKey::RootField {
393 name: "hello".to_string(),
394 inputs: Default::default(),
395 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
396 };
397
398 let response2 = http::Response::builder()
399 .body(router::body::from_bytes(r#"{"data":"world"}"#))
400 .unwrap();
401 let response_key2 = ResponseKey::RootField {
402 name: "hello2".to_string(),
403 inputs: Default::default(),
404 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
405 };
406
407 let supergraph_request = Arc::new(
408 http::Request::builder()
409 .body(graphql::Request::builder().build())
410 .unwrap(),
411 );
412
413 let res = super::aggregate_responses(
414 vec![
415 process_response(
416 Ok(response1),
417 response_key1,
418 connector.clone(),
419 &Context::default(),
420 (None, Default::default()),
421 None,
422 supergraph_request.clone(),
423 Default::default(),
424 )
425 .await
426 .mapped_response,
427 process_response(
428 Ok(response2),
429 response_key2,
430 connector,
431 &Context::default(),
432 (None, Default::default()),
433 None,
434 supergraph_request,
435 Default::default(),
436 )
437 .await
438 .mapped_response,
439 ],
440 Context::new(),
441 )
442 .unwrap();
443
444 assert_debug_snapshot!(res.response, @r#"
445 Response {
446 status: 200,
447 version: HTTP/1.1,
448 headers: {},
449 body: Response {
450 label: None,
451 data: Some(
452 Object({
453 "hello": String(
454 "world",
455 ),
456 "hello2": String(
457 "world",
458 ),
459 }),
460 ),
461 path: None,
462 errors: [],
463 extensions: {},
464 has_next: None,
465 subscribed: None,
466 created_at: None,
467 incremental: [],
468 },
469 }
470 "#);
471 }
472
473 #[tokio::test]
474 async fn test_handle_responses_entities() {
475 let connector = Arc::new(Connector {
476 spec: ConnectSpec::V0_1,
477 schema_subtypes_map: Default::default(),
478 id: ConnectId::new(
479 "subgraph_name".into(),
480 None,
481 name!(Query),
482 name!(user),
483 None,
484 0,
485 ),
486 transport: HttpJsonTransport {
487 source_template: "http://localhost/api".parse().ok(),
488 connect_template: "/path".parse().unwrap(),
489 ..Default::default()
490 },
491 selection: JSONSelection::parse("$.data { id }").unwrap(),
492 entity_resolver: Some(EntityResolver::Explicit),
493 config: Default::default(),
494 max_requests: None,
495 batch_settings: None,
496 request_headers: Default::default(),
497 response_headers: Default::default(),
498 request_variable_keys: Default::default(),
499 response_variable_keys: Default::default(),
500 error_settings: Default::default(),
501 label: "test label".into(),
502 });
503
504 let response1: http::Response<RouterBody> = http::Response::builder()
505 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
506 .unwrap();
507 let response_key1 = ResponseKey::Entity {
508 index: 0,
509 inputs: Default::default(),
510 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
511 };
512
513 let response2 = http::Response::builder()
514 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
515 .unwrap();
516 let response_key2 = ResponseKey::Entity {
517 index: 1,
518 inputs: Default::default(),
519 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
520 };
521
522 let supergraph_request = Arc::new(
523 http::Request::builder()
524 .body(graphql::Request::builder().build())
525 .unwrap(),
526 );
527
528 let res = super::aggregate_responses(
529 vec![
530 process_response(
531 Ok(response1),
532 response_key1,
533 connector.clone(),
534 &Context::default(),
535 (None, Default::default()),
536 None,
537 supergraph_request.clone(),
538 Default::default(),
539 )
540 .await
541 .mapped_response,
542 process_response(
543 Ok(response2),
544 response_key2,
545 connector,
546 &Context::default(),
547 (None, Default::default()),
548 None,
549 supergraph_request,
550 Default::default(),
551 )
552 .await
553 .mapped_response,
554 ],
555 Context::new(),
556 )
557 .unwrap();
558
559 assert_debug_snapshot!(res.response, @r#"
560 Response {
561 status: 200,
562 version: HTTP/1.1,
563 headers: {},
564 body: Response {
565 label: None,
566 data: Some(
567 Object({
568 "_entities": Array([
569 Object({
570 "id": String(
571 "1",
572 ),
573 }),
574 Object({
575 "id": String(
576 "2",
577 ),
578 }),
579 ]),
580 }),
581 ),
582 path: None,
583 errors: [],
584 extensions: {},
585 has_next: None,
586 subscribed: None,
587 created_at: None,
588 incremental: [],
589 },
590 }
591 "#);
592 }
593
594 #[tokio::test]
595 async fn test_handle_responses_batch() {
596 let connector = Arc::new(Connector {
597 spec: ConnectSpec::V0_2,
598 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
599 schema_subtypes_map: Default::default(),
600 transport: HttpJsonTransport {
601 source_template: "http://localhost/api".parse().ok(),
602 connect_template: "/path".parse().unwrap(),
603 method: HTTPMethod::Post,
604 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
605 ..Default::default()
606 },
607 selection: JSONSelection::parse("$.data { id name }").unwrap(),
608 entity_resolver: Some(EntityResolver::TypeBatch),
609 config: Default::default(),
610 max_requests: None,
611 batch_settings: None,
612 request_headers: Default::default(),
613 response_headers: Default::default(),
614 request_variable_keys: Default::default(),
615 response_variable_keys: Default::default(),
616 error_settings: Default::default(),
617 label: "test label".into(),
618 });
619
620 let keys = connector
621 .resolvable_key(
622 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
623 .unwrap(),
624 )
625 .unwrap()
626 .unwrap();
627
628 let response1: http::Response<RouterBody> = http::Response::builder()
629 .body(router::body::from_bytes(
631 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
632 ))
633 .unwrap();
634
635 let mut inputs: RequestInputs = RequestInputs::default();
636 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
637 inputs.batch = representations
638 .as_array()
639 .unwrap()
640 .iter()
641 .map(|v| v.as_object().unwrap().clone())
642 .collect_vec();
643
644 let response_key1 = ResponseKey::BatchEntity {
645 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
646 keys,
647 inputs,
648 };
649
650 let supergraph_request = Arc::new(
651 http::Request::builder()
652 .body(graphql::Request::builder().build())
653 .unwrap(),
654 );
655
656 let res = super::aggregate_responses(
657 vec![
658 process_response(
659 Ok(response1),
660 response_key1,
661 connector.clone(),
662 &Context::default(),
663 (None, Default::default()),
664 None,
665 supergraph_request,
666 Default::default(),
667 )
668 .await
669 .mapped_response,
670 ],
671 Context::new(),
672 )
673 .unwrap();
674
675 assert_debug_snapshot!(res.response, @r#"
676 Response {
677 status: 200,
678 version: HTTP/1.1,
679 headers: {},
680 body: Response {
681 label: None,
682 data: Some(
683 Object({
684 "_entities": Array([
685 Object({
686 "id": String(
687 "1",
688 ),
689 "name": String(
690 "A",
691 ),
692 }),
693 Object({
694 "id": String(
695 "2",
696 ),
697 "name": String(
698 "B",
699 ),
700 }),
701 ]),
702 }),
703 ),
704 path: None,
705 errors: [],
706 extensions: {},
707 has_next: None,
708 subscribed: None,
709 created_at: None,
710 incremental: [],
711 },
712 }
713 "#);
714 }
715
716 #[tokio::test]
717 async fn test_handle_responses_entity_field() {
718 let connector = Arc::new(Connector {
719 spec: ConnectSpec::V0_1,
720 schema_subtypes_map: Default::default(),
721 id: ConnectId::new(
722 "subgraph_name".into(),
723 None,
724 name!(User),
725 name!(field),
726 None,
727 0,
728 ),
729 transport: HttpJsonTransport {
730 source_template: "http://localhost/api".parse().ok(),
731 connect_template: "/path".parse().unwrap(),
732 ..Default::default()
733 },
734 selection: JSONSelection::parse("$.data").unwrap(),
735 entity_resolver: Some(EntityResolver::Implicit),
736 config: Default::default(),
737 max_requests: None,
738 batch_settings: None,
739 request_headers: Default::default(),
740 response_headers: Default::default(),
741 request_variable_keys: Default::default(),
742 response_variable_keys: Default::default(),
743 error_settings: Default::default(),
744 label: "test label".into(),
745 });
746
747 let response1: http::Response<RouterBody> = http::Response::builder()
748 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
749 .unwrap();
750 let response_key1 = ResponseKey::EntityField {
751 index: 0,
752 inputs: Default::default(),
753 field_name: "field".to_string(),
754 typename: Some(name!("User")),
755 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
756 };
757
758 let response2 = http::Response::builder()
759 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
760 .unwrap();
761 let response_key2 = ResponseKey::EntityField {
762 index: 1,
763 inputs: Default::default(),
764 field_name: "field".to_string(),
765 typename: Some(name!("User")),
766 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
767 };
768
769 let supergraph_request = Arc::new(
770 http::Request::builder()
771 .body(graphql::Request::builder().build())
772 .unwrap(),
773 );
774
775 let res = super::aggregate_responses(
776 vec![
777 process_response(
778 Ok(response1),
779 response_key1,
780 connector.clone(),
781 &Context::default(),
782 (None, Default::default()),
783 None,
784 supergraph_request.clone(),
785 Default::default(),
786 )
787 .await
788 .mapped_response,
789 process_response(
790 Ok(response2),
791 response_key2,
792 connector,
793 &Context::default(),
794 (None, Default::default()),
795 None,
796 supergraph_request,
797 Default::default(),
798 )
799 .await
800 .mapped_response,
801 ],
802 Context::new(),
803 )
804 .unwrap();
805
806 assert_debug_snapshot!(res.response, @r#"
807 Response {
808 status: 200,
809 version: HTTP/1.1,
810 headers: {},
811 body: Response {
812 label: None,
813 data: Some(
814 Object({
815 "_entities": Array([
816 Object({
817 "__typename": String(
818 "User",
819 ),
820 "field": String(
821 "value1",
822 ),
823 }),
824 Object({
825 "__typename": String(
826 "User",
827 ),
828 "field": String(
829 "value2",
830 ),
831 }),
832 ]),
833 }),
834 ),
835 path: None,
836 errors: [],
837 extensions: {},
838 has_next: None,
839 subscribed: None,
840 created_at: None,
841 incremental: [],
842 },
843 }
844 "#);
845 }
846
847 #[tokio::test]
848 async fn test_handle_responses_errors() {
849 let connector = Arc::new(Connector {
850 spec: ConnectSpec::V0_1,
851 schema_subtypes_map: Default::default(),
852 id: ConnectId::new(
853 "subgraph_name".into(),
854 None,
855 name!(Query),
856 name!(user),
857 None,
858 0,
859 ),
860 transport: HttpJsonTransport {
861 source_template: "http://localhost/api".parse().ok(),
862 connect_template: "/path".parse().unwrap(),
863 ..Default::default()
864 },
865 selection: JSONSelection::parse("$.data").unwrap(),
866 entity_resolver: Some(EntityResolver::Explicit),
867 config: Default::default(),
868 max_requests: None,
869 batch_settings: None,
870 request_headers: Default::default(),
871 response_headers: Default::default(),
872 request_variable_keys: Default::default(),
873 response_variable_keys: Default::default(),
874 error_settings: Default::default(),
875 label: "test label".into(),
876 });
877
878 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
879 .body(router::body::from_bytes(r#"plain text"#))
880 .unwrap();
881 let response_key_plaintext = ResponseKey::Entity {
882 index: 0,
883 inputs: Default::default(),
884 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
885 };
886
887 let response1: http::Response<RouterBody> = http::Response::builder()
888 .status(404)
889 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
890 .unwrap();
891 let response_key1 = ResponseKey::Entity {
892 index: 1,
893 inputs: Default::default(),
894 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
895 };
896
897 let response2 = http::Response::builder()
898 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
899 .unwrap();
900 let response_key2 = ResponseKey::Entity {
901 index: 2,
902 inputs: Default::default(),
903 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
904 };
905
906 let response3: http::Response<RouterBody> = http::Response::builder()
907 .status(500)
908 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
909 .unwrap();
910 let response_key3 = ResponseKey::Entity {
911 index: 3,
912 inputs: Default::default(),
913 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
914 };
915
916 let supergraph_request = Arc::new(
917 http::Request::builder()
918 .body(graphql::Request::builder().build())
919 .unwrap(),
920 );
921
922 let mut res = super::aggregate_responses(
923 vec![
924 process_response(
925 Ok(response_plaintext),
926 response_key_plaintext,
927 connector.clone(),
928 &Context::default(),
929 (None, Default::default()),
930 None,
931 supergraph_request.clone(),
932 Default::default(),
933 )
934 .await
935 .mapped_response,
936 process_response(
937 Ok(response1),
938 response_key1,
939 connector.clone(),
940 &Context::default(),
941 (None, Default::default()),
942 None,
943 supergraph_request.clone(),
944 Default::default(),
945 )
946 .await
947 .mapped_response,
948 process_response(
949 Ok(response2),
950 response_key2,
951 connector.clone(),
952 &Context::default(),
953 (None, Default::default()),
954 None,
955 supergraph_request.clone(),
956 Default::default(),
957 )
958 .await
959 .mapped_response,
960 process_response(
961 Ok(response3),
962 response_key3,
963 connector,
964 &Context::default(),
965 (None, Default::default()),
966 None,
967 supergraph_request,
968 Default::default(),
969 )
970 .await
971 .mapped_response,
972 ],
973 Context::new(),
974 )
975 .unwrap();
976
977 let body = res.response.body_mut();
981 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
982
983 assert_debug_snapshot!(res.response, @r#"
984 Response {
985 status: 200,
986 version: HTTP/1.1,
987 headers: {},
988 body: Response {
989 label: None,
990 data: Some(
991 Object({
992 "_entities": Array([
993 Null,
994 Null,
995 Object({
996 "id": String(
997 "2",
998 ),
999 }),
1000 Null,
1001 ]),
1002 }),
1003 ),
1004 path: None,
1005 errors: [
1006 Error {
1007 message: "The server returned data in an unexpected format.",
1008 locations: [],
1009 path: Some(
1010 Path(
1011 [
1012 Key(
1013 "_entities",
1014 None,
1015 ),
1016 Index(
1017 0,
1018 ),
1019 ],
1020 ),
1021 ),
1022 extensions: {
1023 "code": String(
1024 "CONNECTOR_RESPONSE_INVALID",
1025 ),
1026 "service": String(
1027 "subgraph_name",
1028 ),
1029 "connector": Object({
1030 "coordinate": String(
1031 "subgraph_name:Query.user[0]",
1032 ),
1033 }),
1034 "http": Object({
1035 "status": Number(200),
1036 }),
1037 "apollo.private.subgraph.name": String(
1038 "subgraph_name",
1039 ),
1040 },
1041 apollo_id: 00000000-0000-0000-0000-000000000000,
1042 },
1043 Error {
1044 message: "Request failed",
1045 locations: [],
1046 path: Some(
1047 Path(
1048 [
1049 Key(
1050 "_entities",
1051 None,
1052 ),
1053 Index(
1054 1,
1055 ),
1056 ],
1057 ),
1058 ),
1059 extensions: {
1060 "code": String(
1061 "CONNECTOR_FETCH",
1062 ),
1063 "service": String(
1064 "subgraph_name",
1065 ),
1066 "connector": Object({
1067 "coordinate": String(
1068 "subgraph_name:Query.user[0]",
1069 ),
1070 }),
1071 "http": Object({
1072 "status": Number(404),
1073 }),
1074 "apollo.private.subgraph.name": String(
1075 "subgraph_name",
1076 ),
1077 },
1078 apollo_id: 00000000-0000-0000-0000-000000000000,
1079 },
1080 Error {
1081 message: "Request failed",
1082 locations: [],
1083 path: Some(
1084 Path(
1085 [
1086 Key(
1087 "_entities",
1088 None,
1089 ),
1090 Index(
1091 3,
1092 ),
1093 ],
1094 ),
1095 ),
1096 extensions: {
1097 "code": String(
1098 "CONNECTOR_FETCH",
1099 ),
1100 "service": String(
1101 "subgraph_name",
1102 ),
1103 "connector": Object({
1104 "coordinate": String(
1105 "subgraph_name:Query.user[0]",
1106 ),
1107 }),
1108 "http": Object({
1109 "status": Number(500),
1110 }),
1111 "apollo.private.subgraph.name": String(
1112 "subgraph_name",
1113 ),
1114 },
1115 apollo_id: 00000000-0000-0000-0000-000000000000,
1116 },
1117 ],
1118 extensions: {},
1119 has_next: None,
1120 subscribed: None,
1121 created_at: None,
1122 incremental: [],
1123 },
1124 }
1125 "#);
1126 }
1127
1128 #[tokio::test]
1129 async fn test_handle_responses_status() {
1130 let selection = JSONSelection::parse("$status").unwrap();
1131 let connector = Arc::new(Connector {
1132 spec: ConnectSpec::V0_1,
1133 schema_subtypes_map: Default::default(),
1134 id: ConnectId::new(
1135 "subgraph_name".into(),
1136 None,
1137 name!(Query),
1138 name!(hello),
1139 None,
1140 0,
1141 ),
1142 transport: HttpJsonTransport {
1143 source_template: "http://localhost/api".parse().ok(),
1144 connect_template: "/path".parse().unwrap(),
1145 ..Default::default()
1146 },
1147 selection: selection.clone(),
1148 entity_resolver: None,
1149 config: Default::default(),
1150 max_requests: None,
1151 batch_settings: None,
1152 request_headers: Default::default(),
1153 response_headers: Default::default(),
1154 request_variable_keys: Default::default(),
1155 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1156 error_settings: Default::default(),
1157 label: "test label".into(),
1158 });
1159
1160 let response1: http::Response<RouterBody> = http::Response::builder()
1161 .status(201)
1162 .body(router::body::from_bytes(r#"{}"#))
1163 .unwrap();
1164 let response_key1 = ResponseKey::RootField {
1165 name: "hello".to_string(),
1166 inputs: Default::default(),
1167 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1168 };
1169
1170 let supergraph_request = Arc::new(
1171 http::Request::builder()
1172 .body(graphql::Request::builder().build())
1173 .unwrap(),
1174 );
1175
1176 let res = super::aggregate_responses(
1177 vec![
1178 process_response(
1179 Ok(response1),
1180 response_key1,
1181 connector,
1182 &Context::default(),
1183 (None, Default::default()),
1184 None,
1185 supergraph_request,
1186 Default::default(),
1187 )
1188 .await
1189 .mapped_response,
1190 ],
1191 Context::new(),
1192 )
1193 .unwrap();
1194
1195 assert_debug_snapshot!(res.response, @r#"
1196 Response {
1197 status: 200,
1198 version: HTTP/1.1,
1199 headers: {},
1200 body: Response {
1201 label: None,
1202 data: Some(
1203 Object({
1204 "hello": Number(201),
1205 }),
1206 ),
1207 path: None,
1208 errors: [],
1209 extensions: {},
1210 has_next: None,
1211 subscribed: None,
1212 created_at: None,
1213 incremental: [],
1214 },
1215 }
1216 "#);
1217 }
1218
1219 #[tokio::test]
1220 async fn test_handle_response_with_is_success() {
1221 let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1222 let selection = JSONSelection::parse("$status").unwrap();
1223 let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1224 message: Default::default(),
1225 source_extensions: Default::default(),
1226 connect_extensions: Default::default(),
1227 connect_is_success: Some(is_success.clone()),
1228 };
1229 let connector = Arc::new(Connector {
1230 spec: ConnectSpec::V0_1,
1231 schema_subtypes_map: Default::default(),
1232 id: ConnectId::new(
1233 "subgraph_name".into(),
1234 None,
1235 name!(Query),
1236 name!(hello),
1237 None,
1238 0,
1239 ),
1240 transport: HttpJsonTransport {
1241 source_template: "http://localhost/api".parse().ok(),
1242 connect_template: "/path".parse().unwrap(),
1243 ..Default::default()
1244 },
1245 selection: selection.clone(),
1246 entity_resolver: None,
1247 config: Default::default(),
1248 max_requests: None,
1249 batch_settings: None,
1250 request_headers: Default::default(),
1251 response_headers: Default::default(),
1252 request_variable_keys: Default::default(),
1253 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1254 error_settings,
1255 label: Label::from("test label"),
1256 });
1257
1258 let response_fail: http::Response<RouterBody> = http::Response::builder()
1260 .status(201)
1261 .body(router::body::from_bytes(r#"{}"#))
1262 .unwrap();
1263 let response_fail_key = ResponseKey::RootField {
1264 name: "hello".to_string(),
1265 inputs: Default::default(),
1266 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1267 };
1268
1269 let response_succeed: http::Response<RouterBody> = http::Response::builder()
1271 .status(400)
1272 .body(router::body::from_bytes(r#"{}"#))
1273 .unwrap();
1274 let response_succeed_key = ResponseKey::RootField {
1275 name: "hello".to_string(),
1276 inputs: Default::default(),
1277 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1278 };
1279
1280 let supergraph_request = Arc::new(
1281 http::Request::builder()
1282 .body(graphql::Request::builder().build())
1283 .unwrap(),
1284 );
1285
1286 let res_expect_fail = super::aggregate_responses(
1288 vec![
1289 process_response(
1290 Ok(response_fail),
1291 response_fail_key,
1292 connector.clone(),
1293 &Context::default(),
1294 (None, Default::default()),
1295 None,
1296 supergraph_request.clone(),
1297 Default::default(),
1298 )
1299 .await
1300 .mapped_response,
1301 ],
1302 Context::new(),
1303 )
1304 .unwrap()
1305 .response;
1306 assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1307 assert_eq!(res_expect_fail.body().errors.len(), 1);
1308
1309 let res_expect_success = super::aggregate_responses(
1311 vec![
1312 process_response(
1313 Ok(response_succeed),
1314 response_succeed_key,
1315 connector.clone(),
1316 &Context::default(),
1317 (None, Default::default()),
1318 None,
1319 supergraph_request.clone(),
1320 Default::default(),
1321 )
1322 .await
1323 .mapped_response,
1324 ],
1325 Context::new(),
1326 )
1327 .unwrap()
1328 .response;
1329 assert!(res_expect_success.body().errors.is_empty());
1330 assert_eq!(
1331 &res_expect_success.body().data,
1332 &Some(json!({"hello": json!(400)}))
1333 );
1334 }
1335}