1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use opentelemetry::KeyValue;
23use parking_lot::Mutex;
24use serde_json_bytes::Map;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44
45impl From<RuntimeError> for graphql::Error {
48 fn from(error: RuntimeError) -> Self {
49 let path: Path = (&error.path).into();
50
51 let err = graphql::Error::builder()
52 .message(&error.message)
53 .extensions(error.extensions())
54 .extension_code(error.code())
55 .path(path)
56 .build();
57
58 if let Some(subgraph_name) = &error.subgraph_name {
59 err.with_subgraph_name(subgraph_name)
60 } else {
61 err
62 }
63 }
64}
65
66#[allow(clippy::too_many_arguments)]
69pub(crate) async fn process_response<T: HttpBody>(
70 result: Result<http::Response<T>, Error>,
71 response_key: ResponseKey,
72 connector: Arc<Connector>,
73 context: &Context,
74 debug_request: DebugRequest,
75 debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
76 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
77 operation: Option<Arc<Valid<ExecutableDocument>>>,
78) -> connector::request_service::Response {
79 let (mapped_response, result) = match result {
80 Err(error) => {
82 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
83 (
84 MappedResponse::Error {
85 error: error.to_runtime_error(&connector, &response_key),
86 key: response_key,
87 problems: Vec::new(),
88 },
89 Err(error),
90 )
91 }
92 Ok(response) => {
93 let (parts, body) = response.into_parts();
94
95 let result = Ok(TransportResponse::Http(HttpResponse {
96 inner: parts.clone(),
97 }));
98
99 let make_err = || {
100 let mut err = RuntimeError::new(
101 "The server returned data in an unexpected format.".to_string(),
102 &response_key,
103 );
104 err.subgraph_name = Some(connector.id.subgraph_name.clone());
105 err = err.with_code("CONNECTOR_RESPONSE_INVALID");
106 err.coordinate = Some(connector.id.coordinate());
107 err = err.extension(
108 "http",
109 Value::Object(Map::from_iter([(
110 "status".into(),
111 Value::Number(parts.status.as_u16().into()),
112 )])),
113 );
114 err
115 };
116
117 let deserialized_body = body
118 .collect()
119 .await
120 .map_err(|_| ())
121 .and_then(|body| {
122 let body = body.to_bytes();
123 let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
124 if let Some(debug_context) = debug_context {
125 debug_context.lock().push_invalid_response(
126 debug_request.0.clone(),
127 &parts,
128 &body,
129 &connector.error_settings,
130 debug_request.1.clone(),
131 );
132 }
133 });
134 log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
135 raw
136 })
137 .map_err(|()| make_err());
138
139 let mapped = match &deserialized_body {
143 Err(error) => MappedResponse::Error {
144 error: error.clone(),
145 key: response_key,
146 problems: Vec::new(),
147 },
148 Ok(data) => handle_raw_response(
149 data,
150 &parts,
151 response_key,
152 &connector,
153 context,
154 supergraph_request.headers(),
155 )
156 .apply_operation(
157 operation
158 .as_ref()
159 .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
160 &connector.schema_subtypes_map,
161 ),
162 };
163
164 if let Some(debug) = debug_context {
165 let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
166 debug_problems.extend(debug_request.1);
167
168 let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
169 Some(SelectionData {
170 source: connector.selection.to_string(),
171 transformed: key.selection().to_string(),
172 result: Some(data.clone()),
173 })
174 } else {
175 None
176 };
177
178 debug.lock().push_response(
179 debug_request.0,
180 &parts,
181 deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
182 selection_data,
183 &connector.error_settings,
184 debug_problems,
185 );
186 }
187 if matches!(mapped, MappedResponse::Data { .. }) {
188 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
189 } else {
190 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
191 }
192
193 (mapped, result)
194 }
195 };
196
197 if let MappedResponse::Error { ref error, .. } = mapped_response {
198 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
199 }
200
201 connector::request_service::Response {
202 transport_result: result,
203 mapped_response,
204 }
205}
206
207pub(crate) fn aggregate_responses(
208 responses: Vec<MappedResponse>,
209 _context: Context,
210) -> Result<Response, HandleResponseError> {
211 let mut data = serde_json_bytes::Map::new();
212 let mut errors = Vec::new();
213 let count = responses.len();
214
215 for mapped in responses {
216 mapped.add_to_data(&mut data, &mut errors, count)?;
217 }
218
219 let data = if data.is_empty() {
220 Value::Null
221 } else {
222 Value::Object(data)
223 };
224
225 Span::current().record(
226 OTEL_STATUS_CODE,
227 if errors.is_empty() {
228 OTEL_STATUS_CODE_OK
229 } else {
230 OTEL_STATUS_CODE_ERROR
231 },
232 );
233
234 Ok(Response {
235 response: http::Response::builder()
236 .body(
237 graphql::Response::builder()
238 .data(data)
239 .errors(errors.into_iter().map(|e| e.into()).collect())
240 .build(),
241 )
242 .unwrap(),
243 })
244}
245
246fn log_connectors_event(
247 context: &Context,
248 body: &[u8],
249 parts: &Parts,
250 response_key: ResponseKey,
251 connector: &Connector,
252) {
253 let log_response_level = context
254 .extensions()
255 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
256 .and_then(|event| {
257 let response = connector::request_service::Response {
269 transport_result: Ok(TransportResponse::Http(HttpResponse {
270 inner: parts.clone(),
271 })),
272 mapped_response: MappedResponse::Data {
273 data: Value::Null,
274 key: response_key,
275 problems: vec![],
276 },
277 };
278 if event.condition.evaluate_response(&response) {
279 Some(event.level)
280 } else {
281 None
282 }
283 });
284
285 if let Some(level) = log_response_level {
286 let mut attrs = Vec::with_capacity(4);
287 #[cfg(test)]
288 let headers = {
289 let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
290 .headers
291 .iter()
292 .map(|(name, val)| (name.to_string(), val.clone()))
293 .collect();
294 headers.sort_keys();
295 headers
296 };
297 #[cfg(not(test))]
298 let headers = &parts.headers;
299
300 attrs.push(KeyValue::new(
301 HTTP_RESPONSE_HEADERS,
302 opentelemetry::Value::String(format!("{headers:?}").into()),
303 ));
304 attrs.push(KeyValue::new(
305 HTTP_RESPONSE_STATUS,
306 opentelemetry::Value::String(format!("{}", parts.status).into()),
307 ));
308 attrs.push(KeyValue::new(
309 HTTP_RESPONSE_VERSION,
310 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
311 ));
312 attrs.push(KeyValue::new(
313 HTTP_RESPONSE_BODY,
314 opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
315 ));
316
317 log_event(
318 level,
319 "connector.response",
320 attrs,
321 &format!("Response from connector {label:?}", label = connector.label),
322 );
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use std::sync::Arc;
329
330 use apollo_compiler::Schema;
331 use apollo_compiler::collections::IndexMap;
332 use apollo_compiler::name;
333 use apollo_compiler::response::JsonValue;
334 use apollo_federation::connectors::ConnectId;
335 use apollo_federation::connectors::ConnectSpec;
336 use apollo_federation::connectors::Connector;
337 use apollo_federation::connectors::ConnectorErrorsSettings;
338 use apollo_federation::connectors::EntityResolver;
339 use apollo_federation::connectors::HTTPMethod;
340 use apollo_federation::connectors::HttpJsonTransport;
341 use apollo_federation::connectors::JSONSelection;
342 use apollo_federation::connectors::Label;
343 use apollo_federation::connectors::Namespace;
344 use apollo_federation::connectors::runtime::inputs::RequestInputs;
345 use apollo_federation::connectors::runtime::key::ResponseKey;
346 use insta::assert_debug_snapshot;
347 use itertools::Itertools;
348 use serde_json_bytes::json;
349
350 use crate::Context;
351 use crate::graphql;
352 use crate::plugins::connectors::handle_responses::process_response;
353 use crate::services::router;
354 use crate::services::router::body::RouterBody;
355
356 #[tokio::test]
357 async fn test_handle_responses_root_fields() {
358 let connector = Arc::new(Connector {
359 spec: ConnectSpec::V0_1,
360 schema_subtypes_map: Default::default(),
361 id: ConnectId::new(
362 "subgraph_name".into(),
363 None,
364 name!(Query),
365 name!(hello),
366 None,
367 0,
368 ),
369 transport: HttpJsonTransport {
370 source_template: "http://localhost/api".parse().ok(),
371 connect_template: "/path".parse().unwrap(),
372 ..Default::default()
373 },
374 selection: JSONSelection::parse("$.data").unwrap(),
375 entity_resolver: None,
376 config: Default::default(),
377 max_requests: None,
378 batch_settings: None,
379 request_headers: Default::default(),
380 response_headers: Default::default(),
381 request_variable_keys: Default::default(),
382 response_variable_keys: Default::default(),
383 error_settings: Default::default(),
384 label: "test label".into(),
385 });
386
387 let response1: http::Response<RouterBody> = http::Response::builder()
388 .body(router::body::from_bytes(r#"{"data":"world"}"#))
389 .unwrap();
390 let response_key1 = ResponseKey::RootField {
391 name: "hello".to_string(),
392 inputs: Default::default(),
393 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
394 };
395
396 let response2 = http::Response::builder()
397 .body(router::body::from_bytes(r#"{"data":"world"}"#))
398 .unwrap();
399 let response_key2 = ResponseKey::RootField {
400 name: "hello2".to_string(),
401 inputs: Default::default(),
402 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
403 };
404
405 let supergraph_request = Arc::new(
406 http::Request::builder()
407 .body(graphql::Request::builder().build())
408 .unwrap(),
409 );
410
411 let res = super::aggregate_responses(
412 vec![
413 process_response(
414 Ok(response1),
415 response_key1,
416 connector.clone(),
417 &Context::default(),
418 (None, Default::default()),
419 None,
420 supergraph_request.clone(),
421 Default::default(),
422 )
423 .await
424 .mapped_response,
425 process_response(
426 Ok(response2),
427 response_key2,
428 connector,
429 &Context::default(),
430 (None, Default::default()),
431 None,
432 supergraph_request,
433 Default::default(),
434 )
435 .await
436 .mapped_response,
437 ],
438 Context::new(),
439 )
440 .unwrap();
441
442 assert_debug_snapshot!(res.response, @r#"
443 Response {
444 status: 200,
445 version: HTTP/1.1,
446 headers: {},
447 body: Response {
448 label: None,
449 data: Some(
450 Object({
451 "hello": String(
452 "world",
453 ),
454 "hello2": String(
455 "world",
456 ),
457 }),
458 ),
459 path: None,
460 errors: [],
461 extensions: {},
462 has_next: None,
463 subscribed: None,
464 created_at: None,
465 incremental: [],
466 },
467 }
468 "#);
469 }
470
471 #[tokio::test]
472 async fn test_handle_responses_entities() {
473 let connector = Arc::new(Connector {
474 spec: ConnectSpec::V0_1,
475 schema_subtypes_map: Default::default(),
476 id: ConnectId::new(
477 "subgraph_name".into(),
478 None,
479 name!(Query),
480 name!(user),
481 None,
482 0,
483 ),
484 transport: HttpJsonTransport {
485 source_template: "http://localhost/api".parse().ok(),
486 connect_template: "/path".parse().unwrap(),
487 ..Default::default()
488 },
489 selection: JSONSelection::parse("$.data { id }").unwrap(),
490 entity_resolver: Some(EntityResolver::Explicit),
491 config: Default::default(),
492 max_requests: None,
493 batch_settings: None,
494 request_headers: Default::default(),
495 response_headers: Default::default(),
496 request_variable_keys: Default::default(),
497 response_variable_keys: Default::default(),
498 error_settings: Default::default(),
499 label: "test label".into(),
500 });
501
502 let response1: http::Response<RouterBody> = http::Response::builder()
503 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
504 .unwrap();
505 let response_key1 = ResponseKey::Entity {
506 index: 0,
507 inputs: Default::default(),
508 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
509 };
510
511 let response2 = http::Response::builder()
512 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
513 .unwrap();
514 let response_key2 = ResponseKey::Entity {
515 index: 1,
516 inputs: Default::default(),
517 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
518 };
519
520 let supergraph_request = Arc::new(
521 http::Request::builder()
522 .body(graphql::Request::builder().build())
523 .unwrap(),
524 );
525
526 let res = super::aggregate_responses(
527 vec![
528 process_response(
529 Ok(response1),
530 response_key1,
531 connector.clone(),
532 &Context::default(),
533 (None, Default::default()),
534 None,
535 supergraph_request.clone(),
536 Default::default(),
537 )
538 .await
539 .mapped_response,
540 process_response(
541 Ok(response2),
542 response_key2,
543 connector,
544 &Context::default(),
545 (None, Default::default()),
546 None,
547 supergraph_request,
548 Default::default(),
549 )
550 .await
551 .mapped_response,
552 ],
553 Context::new(),
554 )
555 .unwrap();
556
557 assert_debug_snapshot!(res.response, @r#"
558 Response {
559 status: 200,
560 version: HTTP/1.1,
561 headers: {},
562 body: Response {
563 label: None,
564 data: Some(
565 Object({
566 "_entities": Array([
567 Object({
568 "id": String(
569 "1",
570 ),
571 }),
572 Object({
573 "id": String(
574 "2",
575 ),
576 }),
577 ]),
578 }),
579 ),
580 path: None,
581 errors: [],
582 extensions: {},
583 has_next: None,
584 subscribed: None,
585 created_at: None,
586 incremental: [],
587 },
588 }
589 "#);
590 }
591
592 #[tokio::test]
593 async fn test_handle_responses_batch() {
594 let connector = Arc::new(Connector {
595 spec: ConnectSpec::V0_2,
596 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
597 schema_subtypes_map: Default::default(),
598 transport: HttpJsonTransport {
599 source_template: "http://localhost/api".parse().ok(),
600 connect_template: "/path".parse().unwrap(),
601 method: HTTPMethod::Post,
602 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
603 ..Default::default()
604 },
605 selection: JSONSelection::parse("$.data { id name }").unwrap(),
606 entity_resolver: Some(EntityResolver::TypeBatch),
607 config: Default::default(),
608 max_requests: None,
609 batch_settings: None,
610 request_headers: Default::default(),
611 response_headers: Default::default(),
612 request_variable_keys: Default::default(),
613 response_variable_keys: Default::default(),
614 error_settings: Default::default(),
615 label: "test label".into(),
616 });
617
618 let keys = connector
619 .resolvable_key(
620 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
621 .unwrap(),
622 )
623 .unwrap()
624 .unwrap();
625
626 let response1: http::Response<RouterBody> = http::Response::builder()
627 .body(router::body::from_bytes(
629 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
630 ))
631 .unwrap();
632
633 let mut inputs: RequestInputs = RequestInputs::default();
634 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
635 inputs.batch = representations
636 .as_array()
637 .unwrap()
638 .iter()
639 .cloned()
640 .map(|v| v.as_object().unwrap().clone())
641 .collect_vec();
642
643 let response_key1 = ResponseKey::BatchEntity {
644 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
645 keys,
646 inputs,
647 };
648
649 let supergraph_request = Arc::new(
650 http::Request::builder()
651 .body(graphql::Request::builder().build())
652 .unwrap(),
653 );
654
655 let res = super::aggregate_responses(
656 vec![
657 process_response(
658 Ok(response1),
659 response_key1,
660 connector.clone(),
661 &Context::default(),
662 (None, Default::default()),
663 None,
664 supergraph_request,
665 Default::default(),
666 )
667 .await
668 .mapped_response,
669 ],
670 Context::new(),
671 )
672 .unwrap();
673
674 assert_debug_snapshot!(res.response, @r#"
675 Response {
676 status: 200,
677 version: HTTP/1.1,
678 headers: {},
679 body: Response {
680 label: None,
681 data: Some(
682 Object({
683 "_entities": Array([
684 Object({
685 "id": String(
686 "1",
687 ),
688 "name": String(
689 "A",
690 ),
691 }),
692 Object({
693 "id": String(
694 "2",
695 ),
696 "name": String(
697 "B",
698 ),
699 }),
700 ]),
701 }),
702 ),
703 path: None,
704 errors: [],
705 extensions: {},
706 has_next: None,
707 subscribed: None,
708 created_at: None,
709 incremental: [],
710 },
711 }
712 "#);
713 }
714
715 #[tokio::test]
716 async fn test_handle_responses_entity_field() {
717 let connector = Arc::new(Connector {
718 spec: ConnectSpec::V0_1,
719 schema_subtypes_map: Default::default(),
720 id: ConnectId::new(
721 "subgraph_name".into(),
722 None,
723 name!(User),
724 name!(field),
725 None,
726 0,
727 ),
728 transport: HttpJsonTransport {
729 source_template: "http://localhost/api".parse().ok(),
730 connect_template: "/path".parse().unwrap(),
731 ..Default::default()
732 },
733 selection: JSONSelection::parse("$.data").unwrap(),
734 entity_resolver: Some(EntityResolver::Implicit),
735 config: Default::default(),
736 max_requests: None,
737 batch_settings: None,
738 request_headers: Default::default(),
739 response_headers: Default::default(),
740 request_variable_keys: Default::default(),
741 response_variable_keys: Default::default(),
742 error_settings: Default::default(),
743 label: "test label".into(),
744 });
745
746 let response1: http::Response<RouterBody> = http::Response::builder()
747 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
748 .unwrap();
749 let response_key1 = ResponseKey::EntityField {
750 index: 0,
751 inputs: Default::default(),
752 field_name: "field".to_string(),
753 typename: Some(name!("User")),
754 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
755 };
756
757 let response2 = http::Response::builder()
758 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
759 .unwrap();
760 let response_key2 = ResponseKey::EntityField {
761 index: 1,
762 inputs: Default::default(),
763 field_name: "field".to_string(),
764 typename: Some(name!("User")),
765 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
766 };
767
768 let supergraph_request = Arc::new(
769 http::Request::builder()
770 .body(graphql::Request::builder().build())
771 .unwrap(),
772 );
773
774 let res = super::aggregate_responses(
775 vec![
776 process_response(
777 Ok(response1),
778 response_key1,
779 connector.clone(),
780 &Context::default(),
781 (None, Default::default()),
782 None,
783 supergraph_request.clone(),
784 Default::default(),
785 )
786 .await
787 .mapped_response,
788 process_response(
789 Ok(response2),
790 response_key2,
791 connector,
792 &Context::default(),
793 (None, Default::default()),
794 None,
795 supergraph_request,
796 Default::default(),
797 )
798 .await
799 .mapped_response,
800 ],
801 Context::new(),
802 )
803 .unwrap();
804
805 assert_debug_snapshot!(res.response, @r#"
806 Response {
807 status: 200,
808 version: HTTP/1.1,
809 headers: {},
810 body: Response {
811 label: None,
812 data: Some(
813 Object({
814 "_entities": Array([
815 Object({
816 "__typename": String(
817 "User",
818 ),
819 "field": String(
820 "value1",
821 ),
822 }),
823 Object({
824 "__typename": String(
825 "User",
826 ),
827 "field": String(
828 "value2",
829 ),
830 }),
831 ]),
832 }),
833 ),
834 path: None,
835 errors: [],
836 extensions: {},
837 has_next: None,
838 subscribed: None,
839 created_at: None,
840 incremental: [],
841 },
842 }
843 "#);
844 }
845
846 #[tokio::test]
847 async fn test_handle_responses_errors() {
848 let connector = Arc::new(Connector {
849 spec: ConnectSpec::V0_1,
850 schema_subtypes_map: Default::default(),
851 id: ConnectId::new(
852 "subgraph_name".into(),
853 None,
854 name!(Query),
855 name!(user),
856 None,
857 0,
858 ),
859 transport: HttpJsonTransport {
860 source_template: "http://localhost/api".parse().ok(),
861 connect_template: "/path".parse().unwrap(),
862 ..Default::default()
863 },
864 selection: JSONSelection::parse("$.data").unwrap(),
865 entity_resolver: Some(EntityResolver::Explicit),
866 config: Default::default(),
867 max_requests: None,
868 batch_settings: None,
869 request_headers: Default::default(),
870 response_headers: Default::default(),
871 request_variable_keys: Default::default(),
872 response_variable_keys: Default::default(),
873 error_settings: Default::default(),
874 label: "test label".into(),
875 });
876
877 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
878 .body(router::body::from_bytes(r#"plain text"#))
879 .unwrap();
880 let response_key_plaintext = ResponseKey::Entity {
881 index: 0,
882 inputs: Default::default(),
883 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
884 };
885
886 let response1: http::Response<RouterBody> = http::Response::builder()
887 .status(404)
888 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
889 .unwrap();
890 let response_key1 = ResponseKey::Entity {
891 index: 1,
892 inputs: Default::default(),
893 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
894 };
895
896 let response2 = http::Response::builder()
897 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
898 .unwrap();
899 let response_key2 = ResponseKey::Entity {
900 index: 2,
901 inputs: Default::default(),
902 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
903 };
904
905 let response3: http::Response<RouterBody> = http::Response::builder()
906 .status(500)
907 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
908 .unwrap();
909 let response_key3 = ResponseKey::Entity {
910 index: 3,
911 inputs: Default::default(),
912 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
913 };
914
915 let supergraph_request = Arc::new(
916 http::Request::builder()
917 .body(graphql::Request::builder().build())
918 .unwrap(),
919 );
920
921 let mut res = super::aggregate_responses(
922 vec![
923 process_response(
924 Ok(response_plaintext),
925 response_key_plaintext,
926 connector.clone(),
927 &Context::default(),
928 (None, Default::default()),
929 None,
930 supergraph_request.clone(),
931 Default::default(),
932 )
933 .await
934 .mapped_response,
935 process_response(
936 Ok(response1),
937 response_key1,
938 connector.clone(),
939 &Context::default(),
940 (None, Default::default()),
941 None,
942 supergraph_request.clone(),
943 Default::default(),
944 )
945 .await
946 .mapped_response,
947 process_response(
948 Ok(response2),
949 response_key2,
950 connector.clone(),
951 &Context::default(),
952 (None, Default::default()),
953 None,
954 supergraph_request.clone(),
955 Default::default(),
956 )
957 .await
958 .mapped_response,
959 process_response(
960 Ok(response3),
961 response_key3,
962 connector,
963 &Context::default(),
964 (None, Default::default()),
965 None,
966 supergraph_request,
967 Default::default(),
968 )
969 .await
970 .mapped_response,
971 ],
972 Context::new(),
973 )
974 .unwrap();
975
976 let body = res.response.body_mut();
980 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
981
982 assert_debug_snapshot!(res.response, @r#"
983 Response {
984 status: 200,
985 version: HTTP/1.1,
986 headers: {},
987 body: Response {
988 label: None,
989 data: Some(
990 Object({
991 "_entities": Array([
992 Null,
993 Null,
994 Object({
995 "id": String(
996 "2",
997 ),
998 }),
999 Null,
1000 ]),
1001 }),
1002 ),
1003 path: None,
1004 errors: [
1005 Error {
1006 message: "The server returned data in an unexpected format.",
1007 locations: [],
1008 path: Some(
1009 Path(
1010 [
1011 Key(
1012 "_entities",
1013 None,
1014 ),
1015 Index(
1016 0,
1017 ),
1018 ],
1019 ),
1020 ),
1021 extensions: {
1022 "code": String(
1023 "CONNECTOR_RESPONSE_INVALID",
1024 ),
1025 "service": String(
1026 "subgraph_name",
1027 ),
1028 "connector": Object({
1029 "coordinate": String(
1030 "subgraph_name:Query.user[0]",
1031 ),
1032 }),
1033 "http": Object({
1034 "status": Number(200),
1035 }),
1036 "apollo.private.subgraph.name": String(
1037 "subgraph_name",
1038 ),
1039 },
1040 apollo_id: 00000000-0000-0000-0000-000000000000,
1041 },
1042 Error {
1043 message: "Request failed",
1044 locations: [],
1045 path: Some(
1046 Path(
1047 [
1048 Key(
1049 "_entities",
1050 None,
1051 ),
1052 Index(
1053 1,
1054 ),
1055 ],
1056 ),
1057 ),
1058 extensions: {
1059 "code": String(
1060 "CONNECTOR_FETCH",
1061 ),
1062 "service": String(
1063 "subgraph_name",
1064 ),
1065 "connector": Object({
1066 "coordinate": String(
1067 "subgraph_name:Query.user[0]",
1068 ),
1069 }),
1070 "http": Object({
1071 "status": Number(404),
1072 }),
1073 "apollo.private.subgraph.name": String(
1074 "subgraph_name",
1075 ),
1076 },
1077 apollo_id: 00000000-0000-0000-0000-000000000000,
1078 },
1079 Error {
1080 message: "Request failed",
1081 locations: [],
1082 path: Some(
1083 Path(
1084 [
1085 Key(
1086 "_entities",
1087 None,
1088 ),
1089 Index(
1090 3,
1091 ),
1092 ],
1093 ),
1094 ),
1095 extensions: {
1096 "code": String(
1097 "CONNECTOR_FETCH",
1098 ),
1099 "service": String(
1100 "subgraph_name",
1101 ),
1102 "connector": Object({
1103 "coordinate": String(
1104 "subgraph_name:Query.user[0]",
1105 ),
1106 }),
1107 "http": Object({
1108 "status": Number(500),
1109 }),
1110 "apollo.private.subgraph.name": String(
1111 "subgraph_name",
1112 ),
1113 },
1114 apollo_id: 00000000-0000-0000-0000-000000000000,
1115 },
1116 ],
1117 extensions: {},
1118 has_next: None,
1119 subscribed: None,
1120 created_at: None,
1121 incremental: [],
1122 },
1123 }
1124 "#);
1125 }
1126
1127 #[tokio::test]
1128 async fn test_handle_responses_status() {
1129 let selection = JSONSelection::parse("$status").unwrap();
1130 let connector = Arc::new(Connector {
1131 spec: ConnectSpec::V0_1,
1132 schema_subtypes_map: Default::default(),
1133 id: ConnectId::new(
1134 "subgraph_name".into(),
1135 None,
1136 name!(Query),
1137 name!(hello),
1138 None,
1139 0,
1140 ),
1141 transport: HttpJsonTransport {
1142 source_template: "http://localhost/api".parse().ok(),
1143 connect_template: "/path".parse().unwrap(),
1144 ..Default::default()
1145 },
1146 selection: selection.clone(),
1147 entity_resolver: None,
1148 config: Default::default(),
1149 max_requests: None,
1150 batch_settings: None,
1151 request_headers: Default::default(),
1152 response_headers: Default::default(),
1153 request_variable_keys: Default::default(),
1154 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1155 error_settings: Default::default(),
1156 label: "test label".into(),
1157 });
1158
1159 let response1: http::Response<RouterBody> = http::Response::builder()
1160 .status(201)
1161 .body(router::body::from_bytes(r#"{}"#))
1162 .unwrap();
1163 let response_key1 = ResponseKey::RootField {
1164 name: "hello".to_string(),
1165 inputs: Default::default(),
1166 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1167 };
1168
1169 let supergraph_request = Arc::new(
1170 http::Request::builder()
1171 .body(graphql::Request::builder().build())
1172 .unwrap(),
1173 );
1174
1175 let res = super::aggregate_responses(
1176 vec![
1177 process_response(
1178 Ok(response1),
1179 response_key1,
1180 connector,
1181 &Context::default(),
1182 (None, Default::default()),
1183 None,
1184 supergraph_request,
1185 Default::default(),
1186 )
1187 .await
1188 .mapped_response,
1189 ],
1190 Context::new(),
1191 )
1192 .unwrap();
1193
1194 assert_debug_snapshot!(res.response, @r#"
1195 Response {
1196 status: 200,
1197 version: HTTP/1.1,
1198 headers: {},
1199 body: Response {
1200 label: None,
1201 data: Some(
1202 Object({
1203 "hello": Number(201),
1204 }),
1205 ),
1206 path: None,
1207 errors: [],
1208 extensions: {},
1209 has_next: None,
1210 subscribed: None,
1211 created_at: None,
1212 incremental: [],
1213 },
1214 }
1215 "#);
1216 }
1217
1218 #[tokio::test]
1219 async fn test_handle_response_with_is_success() {
1220 let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1221 let selection = JSONSelection::parse("$status").unwrap();
1222 let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1223 message: Default::default(),
1224 source_extensions: Default::default(),
1225 connect_extensions: Default::default(),
1226 connect_is_success: Some(is_success.clone()),
1227 };
1228 let connector = Arc::new(Connector {
1229 spec: ConnectSpec::V0_1,
1230 schema_subtypes_map: Default::default(),
1231 id: ConnectId::new(
1232 "subgraph_name".into(),
1233 None,
1234 name!(Query),
1235 name!(hello),
1236 None,
1237 0,
1238 ),
1239 transport: HttpJsonTransport {
1240 source_template: "http://localhost/api".parse().ok(),
1241 connect_template: "/path".parse().unwrap(),
1242 ..Default::default()
1243 },
1244 selection: selection.clone(),
1245 entity_resolver: None,
1246 config: Default::default(),
1247 max_requests: None,
1248 batch_settings: None,
1249 request_headers: Default::default(),
1250 response_headers: Default::default(),
1251 request_variable_keys: Default::default(),
1252 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1253 error_settings,
1254 label: Label::from("test label"),
1255 });
1256
1257 let response_fail: http::Response<RouterBody> = http::Response::builder()
1259 .status(201)
1260 .body(router::body::from_bytes(r#"{}"#))
1261 .unwrap();
1262 let response_fail_key = ResponseKey::RootField {
1263 name: "hello".to_string(),
1264 inputs: Default::default(),
1265 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1266 };
1267
1268 let response_succeed: http::Response<RouterBody> = http::Response::builder()
1270 .status(400)
1271 .body(router::body::from_bytes(r#"{}"#))
1272 .unwrap();
1273 let response_succeed_key = ResponseKey::RootField {
1274 name: "hello".to_string(),
1275 inputs: Default::default(),
1276 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1277 };
1278
1279 let supergraph_request = Arc::new(
1280 http::Request::builder()
1281 .body(graphql::Request::builder().build())
1282 .unwrap(),
1283 );
1284
1285 let res_expect_fail = super::aggregate_responses(
1287 vec![
1288 process_response(
1289 Ok(response_fail),
1290 response_fail_key,
1291 connector.clone(),
1292 &Context::default(),
1293 (None, Default::default()),
1294 None,
1295 supergraph_request.clone(),
1296 Default::default(),
1297 )
1298 .await
1299 .mapped_response,
1300 ],
1301 Context::new(),
1302 )
1303 .unwrap()
1304 .response;
1305 assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1306 assert_eq!(res_expect_fail.body().errors.len(), 1);
1307
1308 let res_expect_success = super::aggregate_responses(
1310 vec![
1311 process_response(
1312 Ok(response_succeed),
1313 response_succeed_key,
1314 connector.clone(),
1315 &Context::default(),
1316 (None, Default::default()),
1317 None,
1318 supergraph_request.clone(),
1319 Default::default(),
1320 )
1321 .await
1322 .mapped_response,
1323 ],
1324 Context::new(),
1325 )
1326 .unwrap()
1327 .response;
1328 assert!(res_expect_success.body().errors.is_empty());
1329 assert_eq!(
1330 &res_expect_success.body().data,
1331 &Some(json!({"hello": json!(400)}))
1332 );
1333 }
1334}