1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use http_body_util::LengthLimitError;
23use http_body_util::Limited;
24use opentelemetry::KeyValue;
25use parking_lot::Mutex;
26use serde_json_bytes::Map;
27use serde_json_bytes::Value;
28use tracing::Span;
29
30use crate::Context;
31use crate::graphql;
32use crate::json_ext::Path;
33use crate::plugins::limits::ConnectorResponseSizeLimit;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
35use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
36use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
37use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
38use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
39use crate::plugins::telemetry::config_new::events::log_event;
40use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
41use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
42use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
43use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
44use crate::services::connect::Response;
45use crate::services::connector;
46use crate::services::fetch::AddSubgraphNameExt;
47
48impl From<RuntimeError> for graphql::Error {
51 fn from(error: RuntimeError) -> Self {
52 let path: Path = (&error.path).into();
53
54 let mut err = graphql::Error::builder()
55 .message(&error.message)
56 .extensions(error.extensions())
57 .extension_code(error.code())
58 .path(path)
59 .build();
60
61 err.set_span_event_emitted(error.span_event_emitted());
66
67 if let Some(subgraph_name) = &error.subgraph_name {
68 err.with_subgraph_name(subgraph_name)
69 } else {
70 err
71 }
72 }
73}
74
75#[allow(clippy::too_many_arguments)]
78pub(crate) async fn process_response<T>(
79 result: Result<http::Response<T>, Error>,
80 response_key: ResponseKey,
81 connector: Arc<Connector>,
82 context: &Context,
83 debug_request: DebugRequest,
84 debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
85 supergraph_request: Arc<http::Request<crate::graphql::Request>>,
86 operation: Option<Arc<Valid<ExecutableDocument>>>,
87) -> connector::request_service::Response
88where
89 T: HttpBody,
90 T::Error: Into<tower::BoxError>,
91{
92 let (mut mapped_response, result) = match result {
93 Err(error) => {
95 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
96 (
97 MappedResponse::Error {
98 error: error.to_runtime_error(&connector, &response_key),
99 key: response_key,
100 problems: Vec::new(),
101 },
102 Err(error),
103 )
104 }
105 Ok(response) => {
106 let (parts, body) = response.into_parts();
107
108 let result = Ok(TransportResponse::Http(HttpResponse {
109 inner: parts.clone(),
110 }));
111
112 let make_err = |message: String, code: &str| -> Box<RuntimeError> {
113 let mut err = RuntimeError::new(message, &response_key);
114 err.subgraph_name = Some(connector.id.subgraph_name.clone());
115 err = err.with_code(code);
116 err.coordinate = Some(connector.id.coordinate());
117 err = err.extension(
118 "http",
119 Value::Object(Map::from_iter([(
120 "status".into(),
121 Value::Number(parts.status.as_u16().into()),
122 )])),
123 );
124 Box::new(err)
125 };
126
127 let make_invalid_response_err = || {
128 make_err(
129 "The server returned data in an unexpected format.".to_string(),
130 "CONNECTOR_RESPONSE_INVALID",
131 )
132 };
133
134 let make_limit_err = |limit: usize| {
135 make_err(
136 format!("connector response body exceeded limit of {limit} bytes"),
137 "CONNECTOR_RESPONSE_SIZE_LIMIT_EXCEEDED",
138 )
139 };
140
141 let response_size_limit = context
142 .extensions()
143 .with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
144
145 let body_result: Result<_, Box<RuntimeError>> = match response_size_limit {
146 Some(ConnectorResponseSizeLimit(limit)) => {
147 Limited::new(body, limit)
148 .collect()
149 .await
150 .map_err(|e| {
151 if e.downcast_ref::<LengthLimitError>().is_some() {
152 u64_counter!(
153 "apollo.router.limits.connector_response_size.exceeded",
154 "Number of connector responses aborted because they exceeded the configured response size limit",
155 1,
156 "connector.source" = connector.source_config_key()
157 );
158 tracing::Span::current()
159 .record("apollo.connector.response.aborted", "response_size_limit");
160 make_limit_err(limit)
161 } else {
162 make_invalid_response_err()
163 }
164 })
165 }
166 None => body
167 .collect()
168 .await
169 .map_err(|_| make_invalid_response_err()),
170 };
171
172 let deserialized_body = body_result.and_then(|body| {
173 let body = body.to_bytes();
174 let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
175 if let Some(debug_context) = debug_context {
176 debug_context.lock().push_invalid_response(
177 debug_request.0.clone(),
178 &parts,
179 &body,
180 &connector.error_settings,
181 debug_request.1.clone(),
182 );
183 }
184 make_invalid_response_err()
185 });
186 log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
187 raw
188 });
189
190 let mapped = match &deserialized_body {
194 Err(error) => MappedResponse::Error {
195 error: error.as_ref().clone(),
196 key: response_key,
197 problems: Vec::new(),
198 },
199 Ok(data) => handle_raw_response(
200 data,
201 &parts,
202 response_key,
203 &connector,
204 context,
205 supergraph_request.headers(),
206 )
207 .apply_operation(
208 operation
209 .as_ref()
210 .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
211 &connector.schema_subtypes_map,
212 ),
213 };
214
215 if let Some(debug) = debug_context {
216 let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
217 debug_problems.extend(debug_request.1);
218
219 let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
220 Some(SelectionData {
221 source: connector.selection.to_string(),
222 transformed: key.selection().to_string(),
223 result: Some(data.clone()),
224 })
225 } else {
226 None
227 };
228
229 debug.lock().push_response(
230 debug_request.0,
231 &parts,
232 deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
233 selection_data,
234 &connector.error_settings,
235 debug_problems,
236 );
237 }
238 if matches!(mapped, MappedResponse::Data { .. }) {
239 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
240 } else {
241 Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
242 }
243
244 (mapped, result)
245 }
246 };
247
248 if let MappedResponse::Error { ref mut error, .. } = mapped_response {
249 emit_error_event(error.code(), &error.message, Some((*error.path).into()));
254 error.set_span_event_emitted(true);
255 }
256
257 connector::request_service::Response {
258 context: context.clone(),
259 subgraph_name: connector.id.subgraph_name.to_string(),
260 transport_result: result,
261 mapped_response,
262 }
263}
264
265pub(crate) fn aggregate_responses(
266 responses: Vec<MappedResponse>,
267 _context: Context,
268) -> Result<Response, HandleResponseError> {
269 let mut data = serde_json_bytes::Map::new();
270 let mut errors = Vec::new();
271 let count = responses.len();
272
273 for mapped in responses {
274 mapped.add_to_data(&mut data, &mut errors, count)?;
275 }
276
277 let data = if data.is_empty() {
278 Value::Null
279 } else {
280 Value::Object(data)
281 };
282
283 Span::current().record(
284 OTEL_STATUS_CODE,
285 if errors.is_empty() {
286 OTEL_STATUS_CODE_OK
287 } else {
288 OTEL_STATUS_CODE_ERROR
289 },
290 );
291
292 Ok(Response {
293 response: http::Response::builder()
294 .body(
295 graphql::Response::builder()
296 .data(data)
297 .errors(errors.into_iter().map(|e| e.into()).collect())
298 .build(),
299 )
300 .unwrap(),
301 })
302}
303
304fn log_connectors_event(
305 context: &Context,
306 body: &[u8],
307 parts: &Parts,
308 response_key: ResponseKey,
309 connector: &Connector,
310) {
311 let log_response_level = context
312 .extensions()
313 .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
314 .and_then(|event| {
315 let response = connector::request_service::Response {
327 context: context.clone(),
328 subgraph_name: connector.id.subgraph_name.to_string(),
329 transport_result: Ok(TransportResponse::Http(HttpResponse {
330 inner: parts.clone(),
331 })),
332 mapped_response: MappedResponse::Data {
333 data: Value::Null,
334 key: response_key,
335 problems: vec![],
336 },
337 };
338 if event.condition.evaluate_response(&response) {
339 Some(event.level)
340 } else {
341 None
342 }
343 });
344
345 if let Some(level) = log_response_level {
346 let mut attrs = Vec::with_capacity(4);
347
348 let header_string = crate::services::header_masking::masked_headers_for_log(
349 context,
350 crate::services::header_masking::Direction::Response,
351 Some(connector.id.subgraph_name.as_str()),
352 &parts.headers,
353 );
354
355 attrs.push(KeyValue::new(
356 HTTP_RESPONSE_HEADERS,
357 opentelemetry::Value::String(header_string.into()),
358 ));
359 attrs.push(KeyValue::new(
360 HTTP_RESPONSE_STATUS,
361 opentelemetry::Value::String(format!("{}", parts.status).into()),
362 ));
363 attrs.push(KeyValue::new(
364 HTTP_RESPONSE_VERSION,
365 opentelemetry::Value::String(format!("{:?}", parts.version).into()),
366 ));
367 attrs.push(KeyValue::new(
368 HTTP_RESPONSE_BODY,
369 opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
370 ));
371
372 log_event(
373 level,
374 "connector.response",
375 attrs,
376 &format!("Response from connector {label:?}", label = connector.label),
377 );
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::sync::Arc;
384
385 use apollo_compiler::Schema;
386 use apollo_compiler::collections::IndexMap;
387 use apollo_compiler::name;
388 use apollo_compiler::response::JsonValue;
389 use apollo_federation::connectors::ConnectId;
390 use apollo_federation::connectors::ConnectSpec;
391 use apollo_federation::connectors::Connector;
392 use apollo_federation::connectors::ConnectorErrorsSettings;
393 use apollo_federation::connectors::EntityResolver;
394 use apollo_federation::connectors::HTTPMethod;
395 use apollo_federation::connectors::HttpJsonTransport;
396 use apollo_federation::connectors::JSONSelection;
397 use apollo_federation::connectors::Label;
398 use apollo_federation::connectors::Namespace;
399 use apollo_federation::connectors::runtime::errors::RuntimeError;
400 use apollo_federation::connectors::runtime::inputs::RequestInputs;
401 use apollo_federation::connectors::runtime::key::ResponseKey;
402 use insta::assert_debug_snapshot;
403 use itertools::Itertools;
404 use serde_json_bytes::json;
405
406 use crate::Context;
407 use crate::graphql;
408 use crate::plugins::connectors::handle_responses::process_response;
409 use crate::services::router;
410 use crate::services::router::body::RouterBody;
411
412 #[test]
413 fn from_runtime_error_transfers_span_event_emitted_flag() {
414 let response_key = ResponseKey::RootField {
415 name: "hello".to_string(),
416 inputs: Default::default(),
417 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
418 };
419
420 let not_emitted = RuntimeError::new("boom", &response_key);
424 let converted: graphql::Error = not_emitted.into();
425 assert!(
426 !converted.span_event_emitted(),
427 "errors that never emitted a span event must not be marked as emitted"
428 );
429
430 let mut emitted = RuntimeError::new("boom", &response_key);
433 emitted.set_span_event_emitted(true);
434 let converted: graphql::Error = emitted.into();
435 assert!(
436 converted.span_event_emitted(),
437 "errors whose source already emitted must stay marked as emitted"
438 );
439 }
440
441 #[tokio::test]
442 async fn test_handle_responses_root_fields() {
443 let connector = Arc::new(Connector {
444 spec: ConnectSpec::V0_1,
445 schema_subtypes_map: Default::default(),
446 id: ConnectId::new(
447 "subgraph_name".into(),
448 None,
449 name!(Query),
450 name!(hello),
451 None,
452 0,
453 ),
454 transport: Some(HttpJsonTransport {
455 source_template: "http://localhost/api".parse().ok(),
456 connect_template: "/path".parse().unwrap(),
457 ..Default::default()
458 }),
459 selection: JSONSelection::parse("$.data").unwrap(),
460 entity_resolver: None,
461 config: Default::default(),
462 max_requests: None,
463 batch_settings: None,
464 request_headers: Default::default(),
465 response_headers: Default::default(),
466 request_variable_keys: Default::default(),
467 response_variable_keys: Default::default(),
468 error_settings: Default::default(),
469 label: "test label".into(),
470 });
471
472 let response1: http::Response<RouterBody> = http::Response::builder()
473 .body(router::body::from_bytes(r#"{"data":"world"}"#))
474 .unwrap();
475 let response_key1 = ResponseKey::RootField {
476 name: "hello".to_string(),
477 inputs: Default::default(),
478 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
479 };
480
481 let response2 = http::Response::builder()
482 .body(router::body::from_bytes(r#"{"data":"world"}"#))
483 .unwrap();
484 let response_key2 = ResponseKey::RootField {
485 name: "hello2".to_string(),
486 inputs: Default::default(),
487 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
488 };
489
490 let supergraph_request = Arc::new(
491 http::Request::builder()
492 .body(graphql::Request::builder().build())
493 .unwrap(),
494 );
495
496 let res = super::aggregate_responses(
497 vec![
498 process_response(
499 Ok(response1),
500 response_key1,
501 connector.clone(),
502 &Context::default(),
503 (None, Default::default()),
504 None,
505 supergraph_request.clone(),
506 Default::default(),
507 )
508 .await
509 .mapped_response,
510 process_response(
511 Ok(response2),
512 response_key2,
513 connector,
514 &Context::default(),
515 (None, Default::default()),
516 None,
517 supergraph_request,
518 Default::default(),
519 )
520 .await
521 .mapped_response,
522 ],
523 Context::new(),
524 )
525 .unwrap();
526
527 assert_debug_snapshot!(res.response, @r#"
528 Response {
529 status: 200,
530 version: HTTP/1.1,
531 headers: {},
532 body: Response {
533 label: None,
534 data: Some(
535 Object({
536 "hello": String(
537 "world",
538 ),
539 "hello2": String(
540 "world",
541 ),
542 }),
543 ),
544 path: None,
545 errors: [],
546 extensions: {},
547 has_next: None,
548 subscribed: None,
549 created_at: None,
550 incremental: [],
551 },
552 }
553 "#);
554 }
555
556 #[tokio::test]
557 async fn test_handle_responses_entities() {
558 let connector = Arc::new(Connector {
559 spec: ConnectSpec::V0_1,
560 schema_subtypes_map: Default::default(),
561 id: ConnectId::new(
562 "subgraph_name".into(),
563 None,
564 name!(Query),
565 name!(user),
566 None,
567 0,
568 ),
569 transport: Some(HttpJsonTransport {
570 source_template: "http://localhost/api".parse().ok(),
571 connect_template: "/path".parse().unwrap(),
572 ..Default::default()
573 }),
574 selection: JSONSelection::parse("$.data { id }").unwrap(),
575 entity_resolver: Some(EntityResolver::Explicit),
576 config: Default::default(),
577 max_requests: None,
578 batch_settings: None,
579 request_headers: Default::default(),
580 response_headers: Default::default(),
581 request_variable_keys: Default::default(),
582 response_variable_keys: Default::default(),
583 error_settings: Default::default(),
584 label: "test label".into(),
585 });
586
587 let response1: http::Response<RouterBody> = http::Response::builder()
588 .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
589 .unwrap();
590 let response_key1 = ResponseKey::Entity {
591 index: 0,
592 inputs: Default::default(),
593 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
594 };
595
596 let response2 = http::Response::builder()
597 .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
598 .unwrap();
599 let response_key2 = ResponseKey::Entity {
600 index: 1,
601 inputs: Default::default(),
602 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
603 };
604
605 let supergraph_request = Arc::new(
606 http::Request::builder()
607 .body(graphql::Request::builder().build())
608 .unwrap(),
609 );
610
611 let res = super::aggregate_responses(
612 vec![
613 process_response(
614 Ok(response1),
615 response_key1,
616 connector.clone(),
617 &Context::default(),
618 (None, Default::default()),
619 None,
620 supergraph_request.clone(),
621 Default::default(),
622 )
623 .await
624 .mapped_response,
625 process_response(
626 Ok(response2),
627 response_key2,
628 connector,
629 &Context::default(),
630 (None, Default::default()),
631 None,
632 supergraph_request,
633 Default::default(),
634 )
635 .await
636 .mapped_response,
637 ],
638 Context::new(),
639 )
640 .unwrap();
641
642 assert_debug_snapshot!(res.response, @r#"
643 Response {
644 status: 200,
645 version: HTTP/1.1,
646 headers: {},
647 body: Response {
648 label: None,
649 data: Some(
650 Object({
651 "_entities": Array([
652 Object({
653 "id": String(
654 "1",
655 ),
656 }),
657 Object({
658 "id": String(
659 "2",
660 ),
661 }),
662 ]),
663 }),
664 ),
665 path: None,
666 errors: [],
667 extensions: {},
668 has_next: None,
669 subscribed: None,
670 created_at: None,
671 incremental: [],
672 },
673 }
674 "#);
675 }
676
677 #[tokio::test]
678 async fn test_handle_responses_batch() {
679 let connector = Arc::new(Connector {
680 spec: ConnectSpec::V0_2,
681 id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
682 schema_subtypes_map: Default::default(),
683 transport: Some(HttpJsonTransport {
684 source_template: "http://localhost/api".parse().ok(),
685 connect_template: "/path".parse().unwrap(),
686 method: HTTPMethod::Post,
687 body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
688 ..Default::default()
689 }),
690 selection: JSONSelection::parse("$.data { id name }").unwrap(),
691 entity_resolver: Some(EntityResolver::TypeBatch),
692 config: Default::default(),
693 max_requests: None,
694 batch_settings: None,
695 request_headers: Default::default(),
696 response_headers: Default::default(),
697 request_variable_keys: Default::default(),
698 response_variable_keys: Default::default(),
699 error_settings: Default::default(),
700 label: "test label".into(),
701 });
702
703 let keys = connector
704 .resolvable_key(
705 &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
706 .unwrap(),
707 )
708 .unwrap()
709 .unwrap();
710
711 let response1: http::Response<RouterBody> = http::Response::builder()
712 .body(router::body::from_bytes(
714 r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
715 ))
716 .unwrap();
717
718 let mut inputs: RequestInputs = RequestInputs::default();
719 let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
720 inputs.batch = representations
721 .as_array()
722 .unwrap()
723 .iter()
724 .map(|v| v.as_object().unwrap().clone())
725 .collect_vec();
726
727 let response_key1 = ResponseKey::BatchEntity {
728 selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
729 keys,
730 inputs,
731 };
732
733 let supergraph_request = Arc::new(
734 http::Request::builder()
735 .body(graphql::Request::builder().build())
736 .unwrap(),
737 );
738
739 let res = super::aggregate_responses(
740 vec![
741 process_response(
742 Ok(response1),
743 response_key1,
744 connector.clone(),
745 &Context::default(),
746 (None, Default::default()),
747 None,
748 supergraph_request,
749 Default::default(),
750 )
751 .await
752 .mapped_response,
753 ],
754 Context::new(),
755 )
756 .unwrap();
757
758 assert_debug_snapshot!(res.response, @r#"
759 Response {
760 status: 200,
761 version: HTTP/1.1,
762 headers: {},
763 body: Response {
764 label: None,
765 data: Some(
766 Object({
767 "_entities": Array([
768 Object({
769 "id": String(
770 "1",
771 ),
772 "name": String(
773 "A",
774 ),
775 }),
776 Object({
777 "id": String(
778 "2",
779 ),
780 "name": String(
781 "B",
782 ),
783 }),
784 ]),
785 }),
786 ),
787 path: None,
788 errors: [],
789 extensions: {},
790 has_next: None,
791 subscribed: None,
792 created_at: None,
793 incremental: [],
794 },
795 }
796 "#);
797 }
798
799 #[tokio::test]
800 async fn test_handle_responses_entity_field() {
801 let connector = Arc::new(Connector {
802 spec: ConnectSpec::V0_1,
803 schema_subtypes_map: Default::default(),
804 id: ConnectId::new(
805 "subgraph_name".into(),
806 None,
807 name!(User),
808 name!(field),
809 None,
810 0,
811 ),
812 transport: Some(HttpJsonTransport {
813 source_template: "http://localhost/api".parse().ok(),
814 connect_template: "/path".parse().unwrap(),
815 ..Default::default()
816 }),
817 selection: JSONSelection::parse("$.data").unwrap(),
818 entity_resolver: Some(EntityResolver::Implicit),
819 config: Default::default(),
820 max_requests: None,
821 batch_settings: None,
822 request_headers: Default::default(),
823 response_headers: Default::default(),
824 request_variable_keys: Default::default(),
825 response_variable_keys: Default::default(),
826 error_settings: Default::default(),
827 label: "test label".into(),
828 });
829
830 let response1: http::Response<RouterBody> = http::Response::builder()
831 .body(router::body::from_bytes(r#"{"data":"value1"}"#))
832 .unwrap();
833 let response_key1 = ResponseKey::EntityField {
834 index: 0,
835 inputs: Default::default(),
836 field_name: "field".to_string(),
837 typename: Some(name!("User")),
838 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
839 };
840
841 let response2 = http::Response::builder()
842 .body(router::body::from_bytes(r#"{"data":"value2"}"#))
843 .unwrap();
844 let response_key2 = ResponseKey::EntityField {
845 index: 1,
846 inputs: Default::default(),
847 field_name: "field".to_string(),
848 typename: Some(name!("User")),
849 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
850 };
851
852 let supergraph_request = Arc::new(
853 http::Request::builder()
854 .body(graphql::Request::builder().build())
855 .unwrap(),
856 );
857
858 let res = super::aggregate_responses(
859 vec![
860 process_response(
861 Ok(response1),
862 response_key1,
863 connector.clone(),
864 &Context::default(),
865 (None, Default::default()),
866 None,
867 supergraph_request.clone(),
868 Default::default(),
869 )
870 .await
871 .mapped_response,
872 process_response(
873 Ok(response2),
874 response_key2,
875 connector,
876 &Context::default(),
877 (None, Default::default()),
878 None,
879 supergraph_request,
880 Default::default(),
881 )
882 .await
883 .mapped_response,
884 ],
885 Context::new(),
886 )
887 .unwrap();
888
889 assert_debug_snapshot!(res.response, @r#"
890 Response {
891 status: 200,
892 version: HTTP/1.1,
893 headers: {},
894 body: Response {
895 label: None,
896 data: Some(
897 Object({
898 "_entities": Array([
899 Object({
900 "__typename": String(
901 "User",
902 ),
903 "field": String(
904 "value1",
905 ),
906 }),
907 Object({
908 "__typename": String(
909 "User",
910 ),
911 "field": String(
912 "value2",
913 ),
914 }),
915 ]),
916 }),
917 ),
918 path: None,
919 errors: [],
920 extensions: {},
921 has_next: None,
922 subscribed: None,
923 created_at: None,
924 incremental: [],
925 },
926 }
927 "#);
928 }
929
930 #[tokio::test]
931 async fn test_handle_responses_errors() {
932 let connector = Arc::new(Connector {
933 spec: ConnectSpec::V0_1,
934 schema_subtypes_map: Default::default(),
935 id: ConnectId::new(
936 "subgraph_name".into(),
937 None,
938 name!(Query),
939 name!(user),
940 None,
941 0,
942 ),
943 transport: Some(HttpJsonTransport {
944 source_template: "http://localhost/api".parse().ok(),
945 connect_template: "/path".parse().unwrap(),
946 ..Default::default()
947 }),
948 selection: JSONSelection::parse("$.data").unwrap(),
949 entity_resolver: Some(EntityResolver::Explicit),
950 config: Default::default(),
951 max_requests: None,
952 batch_settings: None,
953 request_headers: Default::default(),
954 response_headers: Default::default(),
955 request_variable_keys: Default::default(),
956 response_variable_keys: Default::default(),
957 error_settings: Default::default(),
958 label: "test label".into(),
959 });
960
961 let response_plaintext: http::Response<RouterBody> = http::Response::builder()
962 .body(router::body::from_bytes(r#"plain text"#))
963 .unwrap();
964 let response_key_plaintext = ResponseKey::Entity {
965 index: 0,
966 inputs: Default::default(),
967 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
968 };
969
970 let response1: http::Response<RouterBody> = http::Response::builder()
971 .status(404)
972 .body(router::body::from_bytes(r#"{"error":"not found"}"#))
973 .unwrap();
974 let response_key1 = ResponseKey::Entity {
975 index: 1,
976 inputs: Default::default(),
977 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
978 };
979
980 let response2 = http::Response::builder()
981 .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
982 .unwrap();
983 let response_key2 = ResponseKey::Entity {
984 index: 2,
985 inputs: Default::default(),
986 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
987 };
988
989 let response3: http::Response<RouterBody> = http::Response::builder()
990 .status(500)
991 .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
992 .unwrap();
993 let response_key3 = ResponseKey::Entity {
994 index: 3,
995 inputs: Default::default(),
996 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
997 };
998
999 let supergraph_request = Arc::new(
1000 http::Request::builder()
1001 .body(graphql::Request::builder().build())
1002 .unwrap(),
1003 );
1004
1005 let mut res = super::aggregate_responses(
1006 vec![
1007 process_response(
1008 Ok(response_plaintext),
1009 response_key_plaintext,
1010 connector.clone(),
1011 &Context::default(),
1012 (None, Default::default()),
1013 None,
1014 supergraph_request.clone(),
1015 Default::default(),
1016 )
1017 .await
1018 .mapped_response,
1019 process_response(
1020 Ok(response1),
1021 response_key1,
1022 connector.clone(),
1023 &Context::default(),
1024 (None, Default::default()),
1025 None,
1026 supergraph_request.clone(),
1027 Default::default(),
1028 )
1029 .await
1030 .mapped_response,
1031 process_response(
1032 Ok(response2),
1033 response_key2,
1034 connector.clone(),
1035 &Context::default(),
1036 (None, Default::default()),
1037 None,
1038 supergraph_request.clone(),
1039 Default::default(),
1040 )
1041 .await
1042 .mapped_response,
1043 process_response(
1044 Ok(response3),
1045 response_key3,
1046 connector,
1047 &Context::default(),
1048 (None, Default::default()),
1049 None,
1050 supergraph_request,
1051 Default::default(),
1052 )
1053 .await
1054 .mapped_response,
1055 ],
1056 Context::new(),
1057 )
1058 .unwrap();
1059
1060 let body = res.response.body_mut();
1064 body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1065
1066 assert_debug_snapshot!(res.response, @r#"
1067 Response {
1068 status: 200,
1069 version: HTTP/1.1,
1070 headers: {},
1071 body: Response {
1072 label: None,
1073 data: Some(
1074 Object({
1075 "_entities": Array([
1076 Null,
1077 Null,
1078 Object({
1079 "id": String(
1080 "2",
1081 ),
1082 }),
1083 Null,
1084 ]),
1085 }),
1086 ),
1087 path: None,
1088 errors: [
1089 Error {
1090 message: "The server returned data in an unexpected format.",
1091 locations: [],
1092 path: Some(
1093 Path(
1094 [
1095 Key(
1096 "_entities",
1097 None,
1098 ),
1099 Index(
1100 0,
1101 ),
1102 ],
1103 ),
1104 ),
1105 extensions: {
1106 "code": String(
1107 "CONNECTOR_RESPONSE_INVALID",
1108 ),
1109 "service": String(
1110 "subgraph_name",
1111 ),
1112 "connector": Object({
1113 "coordinate": String(
1114 "subgraph_name:Query.user[0]",
1115 ),
1116 }),
1117 "http": Object({
1118 "status": Number(200),
1119 }),
1120 "apollo.private.subgraph.name": String(
1121 "subgraph_name",
1122 ),
1123 },
1124 apollo_id: 00000000-0000-0000-0000-000000000000,
1125 span_event_emitted: true,
1126 },
1127 Error {
1128 message: "Request failed",
1129 locations: [],
1130 path: Some(
1131 Path(
1132 [
1133 Key(
1134 "_entities",
1135 None,
1136 ),
1137 Index(
1138 1,
1139 ),
1140 ],
1141 ),
1142 ),
1143 extensions: {
1144 "code": String(
1145 "CONNECTOR_FETCH",
1146 ),
1147 "service": String(
1148 "subgraph_name",
1149 ),
1150 "connector": Object({
1151 "coordinate": String(
1152 "subgraph_name:Query.user[0]",
1153 ),
1154 }),
1155 "http": Object({
1156 "status": Number(404),
1157 }),
1158 "apollo.private.subgraph.name": String(
1159 "subgraph_name",
1160 ),
1161 },
1162 apollo_id: 00000000-0000-0000-0000-000000000000,
1163 span_event_emitted: true,
1164 },
1165 Error {
1166 message: "Request failed",
1167 locations: [],
1168 path: Some(
1169 Path(
1170 [
1171 Key(
1172 "_entities",
1173 None,
1174 ),
1175 Index(
1176 3,
1177 ),
1178 ],
1179 ),
1180 ),
1181 extensions: {
1182 "code": String(
1183 "CONNECTOR_FETCH",
1184 ),
1185 "service": String(
1186 "subgraph_name",
1187 ),
1188 "connector": Object({
1189 "coordinate": String(
1190 "subgraph_name:Query.user[0]",
1191 ),
1192 }),
1193 "http": Object({
1194 "status": Number(500),
1195 }),
1196 "apollo.private.subgraph.name": String(
1197 "subgraph_name",
1198 ),
1199 },
1200 apollo_id: 00000000-0000-0000-0000-000000000000,
1201 span_event_emitted: true,
1202 },
1203 ],
1204 extensions: {},
1205 has_next: None,
1206 subscribed: None,
1207 created_at: None,
1208 incremental: [],
1209 },
1210 }
1211 "#);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_handle_responses_status() {
1216 let selection = JSONSelection::parse("$status").unwrap();
1217 let connector = Arc::new(Connector {
1218 spec: ConnectSpec::V0_1,
1219 schema_subtypes_map: Default::default(),
1220 id: ConnectId::new(
1221 "subgraph_name".into(),
1222 None,
1223 name!(Query),
1224 name!(hello),
1225 None,
1226 0,
1227 ),
1228 transport: Some(HttpJsonTransport {
1229 source_template: "http://localhost/api".parse().ok(),
1230 connect_template: "/path".parse().unwrap(),
1231 ..Default::default()
1232 }),
1233 selection: selection.clone(),
1234 entity_resolver: None,
1235 config: Default::default(),
1236 max_requests: None,
1237 batch_settings: None,
1238 request_headers: Default::default(),
1239 response_headers: Default::default(),
1240 request_variable_keys: Default::default(),
1241 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1242 error_settings: Default::default(),
1243 label: "test label".into(),
1244 });
1245
1246 let response1: http::Response<RouterBody> = http::Response::builder()
1247 .status(201)
1248 .body(router::body::from_bytes(r#"{}"#))
1249 .unwrap();
1250 let response_key1 = ResponseKey::RootField {
1251 name: "hello".to_string(),
1252 inputs: Default::default(),
1253 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1254 };
1255
1256 let supergraph_request = Arc::new(
1257 http::Request::builder()
1258 .body(graphql::Request::builder().build())
1259 .unwrap(),
1260 );
1261
1262 let res = super::aggregate_responses(
1263 vec![
1264 process_response(
1265 Ok(response1),
1266 response_key1,
1267 connector,
1268 &Context::default(),
1269 (None, Default::default()),
1270 None,
1271 supergraph_request,
1272 Default::default(),
1273 )
1274 .await
1275 .mapped_response,
1276 ],
1277 Context::new(),
1278 )
1279 .unwrap();
1280
1281 assert_debug_snapshot!(res.response, @r#"
1282 Response {
1283 status: 200,
1284 version: HTTP/1.1,
1285 headers: {},
1286 body: Response {
1287 label: None,
1288 data: Some(
1289 Object({
1290 "hello": Number(201),
1291 }),
1292 ),
1293 path: None,
1294 errors: [],
1295 extensions: {},
1296 has_next: None,
1297 subscribed: None,
1298 created_at: None,
1299 incremental: [],
1300 },
1301 }
1302 "#);
1303 }
1304
1305 #[tokio::test]
1306 async fn test_handle_response_with_is_success() {
1307 let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1308 let selection = JSONSelection::parse("$status").unwrap();
1309 let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1310 message: Default::default(),
1311 source_extensions: Default::default(),
1312 connect_extensions: Default::default(),
1313 connect_is_success: Some(is_success.clone()),
1314 };
1315 let connector = Arc::new(Connector {
1316 spec: ConnectSpec::V0_1,
1317 schema_subtypes_map: Default::default(),
1318 id: ConnectId::new(
1319 "subgraph_name".into(),
1320 None,
1321 name!(Query),
1322 name!(hello),
1323 None,
1324 0,
1325 ),
1326 transport: Some(HttpJsonTransport {
1327 source_template: "http://localhost/api".parse().ok(),
1328 connect_template: "/path".parse().unwrap(),
1329 ..Default::default()
1330 }),
1331 selection: selection.clone(),
1332 entity_resolver: None,
1333 config: Default::default(),
1334 max_requests: None,
1335 batch_settings: None,
1336 request_headers: Default::default(),
1337 response_headers: Default::default(),
1338 request_variable_keys: Default::default(),
1339 response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1340 error_settings,
1341 label: Label::from("test label"),
1342 });
1343
1344 let response_fail: http::Response<RouterBody> = http::Response::builder()
1346 .status(201)
1347 .body(router::body::from_bytes(r#"{}"#))
1348 .unwrap();
1349 let response_fail_key = ResponseKey::RootField {
1350 name: "hello".to_string(),
1351 inputs: Default::default(),
1352 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1353 };
1354
1355 let response_succeed: http::Response<RouterBody> = http::Response::builder()
1357 .status(400)
1358 .body(router::body::from_bytes(r#"{}"#))
1359 .unwrap();
1360 let response_succeed_key = ResponseKey::RootField {
1361 name: "hello".to_string(),
1362 inputs: Default::default(),
1363 selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1364 };
1365
1366 let supergraph_request = Arc::new(
1367 http::Request::builder()
1368 .body(graphql::Request::builder().build())
1369 .unwrap(),
1370 );
1371
1372 let res_expect_fail = super::aggregate_responses(
1374 vec![
1375 process_response(
1376 Ok(response_fail),
1377 response_fail_key,
1378 connector.clone(),
1379 &Context::default(),
1380 (None, Default::default()),
1381 None,
1382 supergraph_request.clone(),
1383 Default::default(),
1384 )
1385 .await
1386 .mapped_response,
1387 ],
1388 Context::new(),
1389 )
1390 .unwrap()
1391 .response;
1392 assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1393 assert_eq!(res_expect_fail.body().errors.len(), 1);
1394
1395 let res_expect_success = super::aggregate_responses(
1397 vec![
1398 process_response(
1399 Ok(response_succeed),
1400 response_succeed_key,
1401 connector.clone(),
1402 &Context::default(),
1403 (None, Default::default()),
1404 None,
1405 supergraph_request.clone(),
1406 Default::default(),
1407 )
1408 .await
1409 .mapped_response,
1410 ],
1411 Context::new(),
1412 )
1413 .unwrap()
1414 .response;
1415 assert!(res_expect_success.body().errors.is_empty());
1416 assert_eq!(
1417 &res_expect_success.body().data,
1418 &Some(json!({"hello": json!(400)}))
1419 );
1420 }
1421
1422 fn make_connector() -> Arc<Connector> {
1423 Arc::new(Connector {
1424 spec: ConnectSpec::V0_1,
1425 schema_subtypes_map: Default::default(),
1426 id: ConnectId::new(
1427 "subgraph_name".into(),
1428 None,
1429 name!(Query),
1430 name!(hello),
1431 None,
1432 0,
1433 ),
1434 transport: Some(HttpJsonTransport {
1435 source_template: "http://localhost/api".parse().ok(),
1436 connect_template: "/path".parse().unwrap(),
1437 ..Default::default()
1438 }),
1439 selection: JSONSelection::parse("$.data").unwrap(),
1440 entity_resolver: None,
1441 config: Default::default(),
1442 max_requests: None,
1443 batch_settings: None,
1444 request_headers: Default::default(),
1445 response_headers: Default::default(),
1446 request_variable_keys: Default::default(),
1447 response_variable_keys: Default::default(),
1448 error_settings: Default::default(),
1449 label: "test label".into(),
1450 })
1451 }
1452
1453 fn make_supergraph_request() -> Arc<http::Request<graphql::Request>> {
1454 Arc::new(
1455 http::Request::builder()
1456 .body(graphql::Request::builder().build())
1457 .unwrap(),
1458 )
1459 }
1460
1461 #[tokio::test]
1462 async fn process_response_under_size_limit() {
1463 use crate::plugins::limits::ConnectorResponseSizeLimit;
1464
1465 let ctx = Context::new();
1466 ctx.extensions()
1467 .with_lock(|e| e.insert(ConnectorResponseSizeLimit(1000)));
1468
1469 let key = ResponseKey::RootField {
1470 name: "hello".to_string(),
1471 inputs: Default::default(),
1472 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1473 };
1474 let response = http::Response::builder()
1475 .body(router::body::from_bytes(r#"{"data":"world"}"#))
1476 .unwrap();
1477
1478 let result = process_response(
1479 Ok(response),
1480 key,
1481 make_connector(),
1482 &ctx,
1483 (None, Default::default()),
1484 None,
1485 make_supergraph_request(),
1486 Default::default(),
1487 )
1488 .await;
1489
1490 let graphql_response =
1491 super::aggregate_responses(vec![result.mapped_response], Context::new())
1492 .unwrap()
1493 .response;
1494 assert!(
1495 graphql_response.body().errors.is_empty(),
1496 "expected no errors when response is under the limit"
1497 );
1498 }
1499
1500 #[tokio::test]
1501 async fn process_response_exceeds_size_limit() {
1502 use crate::plugins::limits::ConnectorResponseSizeLimit;
1503
1504 let ctx = Context::new();
1505 ctx.extensions()
1507 .with_lock(|e| e.insert(ConnectorResponseSizeLimit(5)));
1508
1509 let key = ResponseKey::RootField {
1510 name: "hello".to_string(),
1511 inputs: Default::default(),
1512 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1513 };
1514 let response = http::Response::builder()
1515 .body(router::body::from_bytes(r#"{"data":"world"}"#))
1516 .unwrap();
1517
1518 let result = process_response(
1519 Ok(response),
1520 key,
1521 make_connector(),
1522 &ctx,
1523 (None, Default::default()),
1524 None,
1525 make_supergraph_request(),
1526 Default::default(),
1527 )
1528 .await;
1529
1530 let graphql_response =
1531 super::aggregate_responses(vec![result.mapped_response], Context::new())
1532 .unwrap()
1533 .response;
1534 let errors = &graphql_response.body().errors;
1535 assert!(!errors.is_empty(), "expected an error for exceeded limit");
1536 assert!(
1537 errors[0].message.contains("exceeded limit of 5 bytes"),
1538 "unexpected error message: {}",
1539 errors[0].message
1540 );
1541 }
1542
1543 #[tokio::test]
1555 async fn errors_as_data_maps_message_and_extensions_when_is_success_false() {
1556 let connector = Arc::new(Connector {
1557 spec: ConnectSpec::V0_2,
1558 schema_subtypes_map: Default::default(),
1559 id: ConnectId::new(
1560 "subgraph_name".into(),
1561 None,
1562 name!(Query),
1563 name!(hello),
1564 None,
1565 0,
1566 ),
1567 transport: Some(HttpJsonTransport {
1568 source_template: "http://localhost/api".parse().ok(),
1569 connect_template: "/path".parse().unwrap(),
1570 ..Default::default()
1571 }),
1572 selection: JSONSelection::parse("$.data").unwrap(),
1573 entity_resolver: None,
1574 config: Default::default(),
1575 max_requests: None,
1576 batch_settings: None,
1577 request_headers: Default::default(),
1578 response_headers: Default::default(),
1579 request_variable_keys: Default::default(),
1580 response_variable_keys: Default::default(),
1581 error_settings: ConnectorErrorsSettings {
1582 message: Some(JSONSelection::parse("error.message").unwrap()),
1583 connect_extensions: Some(
1584 JSONSelection::parse("code: error.code\nhint: error.hint").unwrap(),
1585 ),
1586 source_extensions: None,
1587 connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1588 },
1589 label: "test label".into(),
1590 });
1591
1592 let response: http::Response<RouterBody> = http::Response::builder()
1593 .status(500)
1594 .body(router::body::from_bytes(
1595 r#"{"error":{"message":"no good","code":"BAD_THING","hint":"try again"}}"#,
1596 ))
1597 .unwrap();
1598 let response_key = ResponseKey::RootField {
1599 name: "hello".to_string(),
1600 inputs: Default::default(),
1601 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1602 };
1603
1604 let supergraph_request = Arc::new(
1605 http::Request::builder()
1606 .body(graphql::Request::builder().build())
1607 .unwrap(),
1608 );
1609
1610 let result = super::aggregate_responses(
1611 vec![
1612 process_response(
1613 Ok(response),
1614 response_key,
1615 connector,
1616 &Context::default(),
1617 (None, Default::default()),
1618 None,
1619 supergraph_request,
1620 Default::default(),
1621 )
1622 .await
1623 .mapped_response,
1624 ],
1625 Context::new(),
1626 )
1627 .unwrap();
1628
1629 let errors = &result.response.body().errors;
1630 assert_eq!(
1631 errors.len(),
1632 1,
1633 "expected exactly one error, got: {errors:?}"
1634 );
1635 let error = &errors[0];
1636
1637 assert_eq!(
1638 error.message, "no good",
1639 "errors.message should be mapped from the response body"
1640 );
1641
1642 let code = error
1643 .extensions
1644 .get("code")
1645 .and_then(|v| v.as_str())
1646 .unwrap_or_default();
1647 assert_eq!(
1648 code, "BAD_THING",
1649 "errors.extensions.code should override default CONNECTOR_FETCH"
1650 );
1651
1652 let hint = error
1653 .extensions
1654 .get("hint")
1655 .and_then(|v| v.as_str())
1656 .unwrap_or_default();
1657 assert_eq!(
1658 hint, "try again",
1659 "errors.extensions.hint should be mapped from the response body"
1660 );
1661
1662 let http_status = error
1663 .extensions
1664 .get("http")
1665 .and_then(|v| v.as_object())
1666 .and_then(|m| m.get("status"))
1667 .and_then(|v| v.as_i64());
1668 assert_eq!(
1669 http_status,
1670 Some(500),
1671 "default extensions.http.status should be preserved alongside the mapped extensions"
1672 );
1673 }
1674
1675 #[tokio::test]
1685 async fn errors_as_data_deep_merges_nested_extensions_with_defaults() {
1686 let connector = Arc::new(Connector {
1687 spec: ConnectSpec::V0_2,
1688 schema_subtypes_map: Default::default(),
1689 id: ConnectId::new(
1690 "subgraph_name".into(),
1691 None,
1692 name!(Query),
1693 name!(hello),
1694 None,
1695 0,
1696 ),
1697 transport: Some(HttpJsonTransport {
1698 source_template: "http://localhost/api".parse().ok(),
1699 connect_template: "/path".parse().unwrap(),
1700 ..Default::default()
1701 }),
1702 selection: JSONSelection::parse("$.data").unwrap(),
1703 entity_resolver: None,
1704 config: Default::default(),
1705 max_requests: None,
1706 batch_settings: None,
1707 request_headers: Default::default(),
1708 response_headers: Default::default(),
1709 request_variable_keys: Default::default(),
1710 response_variable_keys: Default::default(),
1711 error_settings: ConnectorErrorsSettings {
1712 message: None,
1713 connect_extensions: Some(
1714 JSONSelection::parse("http: { myField: $(\"literal Value\") }").unwrap(),
1715 ),
1716 source_extensions: None,
1717 connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1718 },
1719 label: "test label".into(),
1720 });
1721
1722 let response: http::Response<RouterBody> = http::Response::builder()
1723 .status(500)
1724 .body(router::body::from_bytes(r#"{}"#))
1725 .unwrap();
1726 let response_key = ResponseKey::RootField {
1727 name: "hello".to_string(),
1728 inputs: Default::default(),
1729 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1730 };
1731
1732 let supergraph_request = Arc::new(
1733 http::Request::builder()
1734 .body(graphql::Request::builder().build())
1735 .unwrap(),
1736 );
1737
1738 let result = super::aggregate_responses(
1739 vec![
1740 process_response(
1741 Ok(response),
1742 response_key,
1743 connector,
1744 &Context::default(),
1745 (None, Default::default()),
1746 None,
1747 supergraph_request,
1748 Default::default(),
1749 )
1750 .await
1751 .mapped_response,
1752 ],
1753 Context::new(),
1754 )
1755 .unwrap();
1756
1757 let errors = &result.response.body().errors;
1758 assert_eq!(
1759 errors.len(),
1760 1,
1761 "expected exactly one error, got: {errors:?}"
1762 );
1763 let http = errors[0]
1764 .extensions
1765 .get("http")
1766 .and_then(|v| v.as_object())
1767 .expect("extensions.http should be an object");
1768
1769 assert_eq!(
1770 http.get("myField").and_then(|v| v.as_str()),
1771 Some("literal Value"),
1772 "user-supplied extensions.http.myField should appear in the response"
1773 );
1774 assert_eq!(
1775 http.get("status").and_then(|v| v.as_i64()),
1776 Some(500),
1777 "default extensions.http.status should be preserved when the user sets sibling keys under extensions.http"
1778 );
1779 }
1780
1781 #[tokio::test]
1788 async fn errors_as_data_deep_merges_nested_extensions_across_source_and_connect() {
1789 let connector = Arc::new(Connector {
1790 spec: ConnectSpec::V0_2,
1791 schema_subtypes_map: Default::default(),
1792 id: ConnectId::new(
1793 "subgraph_name".into(),
1794 None,
1795 name!(Query),
1796 name!(hello),
1797 None,
1798 0,
1799 ),
1800 transport: Some(HttpJsonTransport {
1801 source_template: "http://localhost/api".parse().ok(),
1802 connect_template: "/path".parse().unwrap(),
1803 ..Default::default()
1804 }),
1805 selection: JSONSelection::parse("$.data").unwrap(),
1806 entity_resolver: None,
1807 config: Default::default(),
1808 max_requests: None,
1809 batch_settings: None,
1810 request_headers: Default::default(),
1811 response_headers: Default::default(),
1812 request_variable_keys: Default::default(),
1813 response_variable_keys: Default::default(),
1814 error_settings: ConnectorErrorsSettings {
1815 message: None,
1816 source_extensions: Some(
1817 JSONSelection::parse("http: { fromSource: $(\"a\") }").unwrap(),
1818 ),
1819 connect_extensions: Some(
1820 JSONSelection::parse("http: { fromConnect: $(\"b\") }").unwrap(),
1821 ),
1822 connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1823 },
1824 label: "test label".into(),
1825 });
1826
1827 let response: http::Response<RouterBody> = http::Response::builder()
1828 .status(500)
1829 .body(router::body::from_bytes(r#"{}"#))
1830 .unwrap();
1831 let response_key = ResponseKey::RootField {
1832 name: "hello".to_string(),
1833 inputs: Default::default(),
1834 selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1835 };
1836
1837 let supergraph_request = Arc::new(
1838 http::Request::builder()
1839 .body(graphql::Request::builder().build())
1840 .unwrap(),
1841 );
1842
1843 let result = super::aggregate_responses(
1844 vec![
1845 process_response(
1846 Ok(response),
1847 response_key,
1848 connector,
1849 &Context::default(),
1850 (None, Default::default()),
1851 None,
1852 supergraph_request,
1853 Default::default(),
1854 )
1855 .await
1856 .mapped_response,
1857 ],
1858 Context::new(),
1859 )
1860 .unwrap();
1861
1862 let errors = &result.response.body().errors;
1863 assert_eq!(
1864 errors.len(),
1865 1,
1866 "expected exactly one error, got: {errors:?}"
1867 );
1868 let http = errors[0]
1869 .extensions
1870 .get("http")
1871 .and_then(|v| v.as_object())
1872 .expect("extensions.http should be an object");
1873
1874 assert_eq!(
1875 http.get("status").and_then(|v| v.as_i64()),
1876 Some(500),
1877 "default extensions.http.status should be preserved alongside source- and connect-supplied siblings"
1878 );
1879 assert_eq!(
1880 http.get("fromSource").and_then(|v| v.as_str()),
1881 Some("a"),
1882 "source_extensions sibling under extensions.http should survive the connect_extensions merge"
1883 );
1884 assert_eq!(
1885 http.get("fromConnect").and_then(|v| v.as_str()),
1886 Some("b"),
1887 "connect_extensions sibling under extensions.http should appear alongside the source sibling"
1888 );
1889 }
1890}