Skip to main content

apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use http_body_util::LengthLimitError;
23use http_body_util::Limited;
24use opentelemetry::KeyValue;
25use parking_lot::Mutex;
26use serde_json_bytes::Map;
27use serde_json_bytes::Value;
28use tracing::Span;
29
30use crate::Context;
31use crate::graphql;
32use crate::json_ext::Path;
33use crate::plugins::limits::ConnectorResponseSizeLimit;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
35use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
36use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
37use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
38use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
39use crate::plugins::telemetry::config_new::events::log_event;
40use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
41use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
42use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
43use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
44use crate::services::connect::Response;
45use crate::services::connector;
46use crate::services::fetch::AddSubgraphNameExt;
47
48// --- ERRORS ------------------------------------------------------------------
49
50impl From<RuntimeError> for graphql::Error {
51    fn from(error: RuntimeError) -> Self {
52        let path: Path = (&error.path).into();
53
54        let mut err = graphql::Error::builder()
55            .message(&error.message)
56            .extensions(error.extensions())
57            .extension_code(error.code())
58            .path(path)
59            .build();
60
61        // Carry over whether a span event was already emitted for this error at its source site
62        // (set by `process_response`). Errors that reach this conversion without emitting — e.g.
63        // coprocessor `Break` or traffic-shaping timeout/rate-limit — keep the flag `false` so the
64        // catch-all in `count_operation_errors` still emits exactly one event for them.
65        err.set_span_event_emitted(error.span_event_emitted());
66
67        if let Some(subgraph_name) = &error.subgraph_name {
68            err.with_subgraph_name(subgraph_name)
69        } else {
70            err
71        }
72    }
73}
74
75// --- handle_responses --------------------------------------------------------
76
77#[allow(clippy::too_many_arguments)]
78pub(crate) async fn process_response<T>(
79    result: Result<http::Response<T>, Error>,
80    response_key: ResponseKey,
81    connector: Arc<Connector>,
82    context: &Context,
83    debug_request: DebugRequest,
84    debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
85    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
86    operation: Option<Arc<Valid<ExecutableDocument>>>,
87) -> connector::request_service::Response
88where
89    T: HttpBody,
90    T::Error: Into<tower::BoxError>,
91{
92    let (mut mapped_response, result) = match result {
93        // This occurs when we short-circuit the request when over the limit
94        Err(error) => {
95            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
96            (
97                MappedResponse::Error {
98                    error: error.to_runtime_error(&connector, &response_key),
99                    key: response_key,
100                    problems: Vec::new(),
101                },
102                Err(error),
103            )
104        }
105        Ok(response) => {
106            let (parts, body) = response.into_parts();
107
108            let result = Ok(TransportResponse::Http(HttpResponse {
109                inner: parts.clone(),
110            }));
111
112            let make_err = |message: String, code: &str| -> Box<RuntimeError> {
113                let mut err = RuntimeError::new(message, &response_key);
114                err.subgraph_name = Some(connector.id.subgraph_name.clone());
115                err = err.with_code(code);
116                err.coordinate = Some(connector.id.coordinate());
117                err = err.extension(
118                    "http",
119                    Value::Object(Map::from_iter([(
120                        "status".into(),
121                        Value::Number(parts.status.as_u16().into()),
122                    )])),
123                );
124                Box::new(err)
125            };
126
127            let make_invalid_response_err = || {
128                make_err(
129                    "The server returned data in an unexpected format.".to_string(),
130                    "CONNECTOR_RESPONSE_INVALID",
131                )
132            };
133
134            let make_limit_err = |limit: usize| {
135                make_err(
136                    format!("connector response body exceeded limit of {limit} bytes"),
137                    "CONNECTOR_RESPONSE_SIZE_LIMIT_EXCEEDED",
138                )
139            };
140
141            let response_size_limit = context
142                .extensions()
143                .with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
144
145            let body_result: Result<_, Box<RuntimeError>> = match response_size_limit {
146                Some(ConnectorResponseSizeLimit(limit)) => {
147                    Limited::new(body, limit)
148                        .collect()
149                        .await
150                        .map_err(|e| {
151                            if e.downcast_ref::<LengthLimitError>().is_some() {
152                                u64_counter!(
153                                    "apollo.router.limits.connector_response_size.exceeded",
154                                    "Number of connector responses aborted because they exceeded the configured response size limit",
155                                    1,
156                                    "connector.source" = connector.source_config_key()
157                                );
158                                tracing::Span::current()
159                                    .record("apollo.connector.response.aborted", "response_size_limit");
160                                make_limit_err(limit)
161                            } else {
162                                make_invalid_response_err()
163                            }
164                        })
165                }
166                None => body
167                    .collect()
168                    .await
169                    .map_err(|_| make_invalid_response_err()),
170            };
171
172            let deserialized_body = body_result.and_then(|body| {
173                let body = body.to_bytes();
174                let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
175                    if let Some(debug_context) = debug_context {
176                        debug_context.lock().push_invalid_response(
177                            debug_request.0.clone(),
178                            &parts,
179                            &body,
180                            &connector.error_settings,
181                            debug_request.1.clone(),
182                        );
183                    }
184                    make_invalid_response_err()
185                });
186                log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
187                raw
188            });
189
190            // If this errors, it will write to the debug context because it
191            // has access to the raw bytes, so we can't write to it again
192            // in any RawResponse::Error branches.
193            let mapped = match &deserialized_body {
194                Err(error) => MappedResponse::Error {
195                    error: error.as_ref().clone(),
196                    key: response_key,
197                    problems: Vec::new(),
198                },
199                Ok(data) => handle_raw_response(
200                    data,
201                    &parts,
202                    response_key,
203                    &connector,
204                    context,
205                    supergraph_request.headers(),
206                )
207                .apply_operation(
208                    operation
209                        .as_ref()
210                        .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
211                    &connector.schema_subtypes_map,
212                ),
213            };
214
215            if let Some(debug) = debug_context {
216                let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
217                debug_problems.extend(debug_request.1);
218
219                let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
220                    Some(SelectionData {
221                        source: connector.selection.to_string(),
222                        transformed: key.selection().to_string(),
223                        result: Some(data.clone()),
224                    })
225                } else {
226                    None
227                };
228
229                debug.lock().push_response(
230                    debug_request.0,
231                    &parts,
232                    deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
233                    selection_data,
234                    &connector.error_settings,
235                    debug_problems,
236                );
237            }
238            if matches!(mapped, MappedResponse::Data { .. }) {
239                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
240            } else {
241                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
242            }
243
244            (mapped, result)
245        }
246    };
247
248    if let MappedResponse::Error { ref mut error, .. } = mapped_response {
249        // Emit here so the event picks up the connector request-service span's attributes
250        // (coordinate, source, etc.). Mark the error as emitted so `From<RuntimeError>` carries
251        // the flag through and the centralized catch-all in `count_operation_errors` won't fire a
252        // duplicate.
253        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
254        error.set_span_event_emitted(true);
255    }
256
257    connector::request_service::Response {
258        context: context.clone(),
259        subgraph_name: connector.id.subgraph_name.to_string(),
260        transport_result: result,
261        mapped_response,
262    }
263}
264
265pub(crate) fn aggregate_responses(
266    responses: Vec<MappedResponse>,
267    _context: Context,
268) -> Result<Response, HandleResponseError> {
269    let mut data = serde_json_bytes::Map::new();
270    let mut errors = Vec::new();
271    let count = responses.len();
272
273    for mapped in responses {
274        mapped.add_to_data(&mut data, &mut errors, count)?;
275    }
276
277    let data = if data.is_empty() {
278        Value::Null
279    } else {
280        Value::Object(data)
281    };
282
283    Span::current().record(
284        OTEL_STATUS_CODE,
285        if errors.is_empty() {
286            OTEL_STATUS_CODE_OK
287        } else {
288            OTEL_STATUS_CODE_ERROR
289        },
290    );
291
292    Ok(Response {
293        response: http::Response::builder()
294            .body(
295                graphql::Response::builder()
296                    .data(data)
297                    .errors(errors.into_iter().map(|e| e.into()).collect())
298                    .build(),
299            )
300            .unwrap(),
301    })
302}
303
304fn log_connectors_event(
305    context: &Context,
306    body: &[u8],
307    parts: &Parts,
308    response_key: ResponseKey,
309    connector: &Connector,
310) {
311    let log_response_level = context
312        .extensions()
313        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
314        .and_then(|event| {
315            // TODO: evaluate if this is still needed now that we're cloning the body anyway
316            // Create a temporary response here so we can evaluate the condition. This response
317            // is missing any information about the mapped response, because we don't have that
318            // yet. This means that we cannot correctly evaluate any condition that relies on
319            // the mapped response data or mapping problems. But we can't wait until we do have
320            // that information, because this is the only place we have the body bytes (without
321            // making an expensive clone of the body). So we either need to not expose any
322            // selector which can be used as a condition that requires mapping information, or
323            // we must document that such selectors cannot be used as conditions on standard
324            // connectors events.
325
326            let response = connector::request_service::Response {
327                context: context.clone(),
328                subgraph_name: connector.id.subgraph_name.to_string(),
329                transport_result: Ok(TransportResponse::Http(HttpResponse {
330                    inner: parts.clone(),
331                })),
332                mapped_response: MappedResponse::Data {
333                    data: Value::Null,
334                    key: response_key,
335                    problems: vec![],
336                },
337            };
338            if event.condition.evaluate_response(&response) {
339                Some(event.level)
340            } else {
341                None
342            }
343        });
344
345    if let Some(level) = log_response_level {
346        let mut attrs = Vec::with_capacity(4);
347
348        let header_string = crate::services::header_masking::masked_headers_for_log(
349            context,
350            crate::services::header_masking::Direction::Response,
351            Some(connector.id.subgraph_name.as_str()),
352            &parts.headers,
353        );
354
355        attrs.push(KeyValue::new(
356            HTTP_RESPONSE_HEADERS,
357            opentelemetry::Value::String(header_string.into()),
358        ));
359        attrs.push(KeyValue::new(
360            HTTP_RESPONSE_STATUS,
361            opentelemetry::Value::String(format!("{}", parts.status).into()),
362        ));
363        attrs.push(KeyValue::new(
364            HTTP_RESPONSE_VERSION,
365            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
366        ));
367        attrs.push(KeyValue::new(
368            HTTP_RESPONSE_BODY,
369            opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
370        ));
371
372        log_event(
373            level,
374            "connector.response",
375            attrs,
376            &format!("Response from connector {label:?}", label = connector.label),
377        );
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use std::sync::Arc;
384
385    use apollo_compiler::Schema;
386    use apollo_compiler::collections::IndexMap;
387    use apollo_compiler::name;
388    use apollo_compiler::response::JsonValue;
389    use apollo_federation::connectors::ConnectId;
390    use apollo_federation::connectors::ConnectSpec;
391    use apollo_federation::connectors::Connector;
392    use apollo_federation::connectors::ConnectorErrorsSettings;
393    use apollo_federation::connectors::EntityResolver;
394    use apollo_federation::connectors::HTTPMethod;
395    use apollo_federation::connectors::HttpJsonTransport;
396    use apollo_federation::connectors::JSONSelection;
397    use apollo_federation::connectors::Label;
398    use apollo_federation::connectors::Namespace;
399    use apollo_federation::connectors::runtime::errors::RuntimeError;
400    use apollo_federation::connectors::runtime::inputs::RequestInputs;
401    use apollo_federation::connectors::runtime::key::ResponseKey;
402    use insta::assert_debug_snapshot;
403    use itertools::Itertools;
404    use serde_json_bytes::json;
405
406    use crate::Context;
407    use crate::graphql;
408    use crate::plugins::connectors::handle_responses::process_response;
409    use crate::services::router;
410    use crate::services::router::body::RouterBody;
411
412    #[test]
413    fn from_runtime_error_transfers_span_event_emitted_flag() {
414        let response_key = ResponseKey::RootField {
415            name: "hello".to_string(),
416            inputs: Default::default(),
417            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
418        };
419
420        // An error that never had a span event emitted at its source (e.g. coprocessor `Break`
421        // or traffic-shaping timeout/rate-limit) must keep the flag `false` so the catch-all in
422        // `count_operation_errors` still emits exactly one event for it.
423        let not_emitted = RuntimeError::new("boom", &response_key);
424        let converted: graphql::Error = not_emitted.into();
425        assert!(
426            !converted.span_event_emitted(),
427            "errors that never emitted a span event must not be marked as emitted"
428        );
429
430        // An error whose source site already emitted (process_response sets this) must carry the
431        // flag through so the catch-all doesn't fire a duplicate.
432        let mut emitted = RuntimeError::new("boom", &response_key);
433        emitted.set_span_event_emitted(true);
434        let converted: graphql::Error = emitted.into();
435        assert!(
436            converted.span_event_emitted(),
437            "errors whose source already emitted must stay marked as emitted"
438        );
439    }
440
441    #[tokio::test]
442    async fn test_handle_responses_root_fields() {
443        let connector = Arc::new(Connector {
444            spec: ConnectSpec::V0_1,
445            schema_subtypes_map: Default::default(),
446            id: ConnectId::new(
447                "subgraph_name".into(),
448                None,
449                name!(Query),
450                name!(hello),
451                None,
452                0,
453            ),
454            transport: Some(HttpJsonTransport {
455                source_template: "http://localhost/api".parse().ok(),
456                connect_template: "/path".parse().unwrap(),
457                ..Default::default()
458            }),
459            selection: JSONSelection::parse("$.data").unwrap(),
460            entity_resolver: None,
461            config: Default::default(),
462            max_requests: None,
463            batch_settings: None,
464            request_headers: Default::default(),
465            response_headers: Default::default(),
466            request_variable_keys: Default::default(),
467            response_variable_keys: Default::default(),
468            error_settings: Default::default(),
469            label: "test label".into(),
470        });
471
472        let response1: http::Response<RouterBody> = http::Response::builder()
473            .body(router::body::from_bytes(r#"{"data":"world"}"#))
474            .unwrap();
475        let response_key1 = ResponseKey::RootField {
476            name: "hello".to_string(),
477            inputs: Default::default(),
478            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
479        };
480
481        let response2 = http::Response::builder()
482            .body(router::body::from_bytes(r#"{"data":"world"}"#))
483            .unwrap();
484        let response_key2 = ResponseKey::RootField {
485            name: "hello2".to_string(),
486            inputs: Default::default(),
487            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
488        };
489
490        let supergraph_request = Arc::new(
491            http::Request::builder()
492                .body(graphql::Request::builder().build())
493                .unwrap(),
494        );
495
496        let res = super::aggregate_responses(
497            vec![
498                process_response(
499                    Ok(response1),
500                    response_key1,
501                    connector.clone(),
502                    &Context::default(),
503                    (None, Default::default()),
504                    None,
505                    supergraph_request.clone(),
506                    Default::default(),
507                )
508                .await
509                .mapped_response,
510                process_response(
511                    Ok(response2),
512                    response_key2,
513                    connector,
514                    &Context::default(),
515                    (None, Default::default()),
516                    None,
517                    supergraph_request,
518                    Default::default(),
519                )
520                .await
521                .mapped_response,
522            ],
523            Context::new(),
524        )
525        .unwrap();
526
527        assert_debug_snapshot!(res.response, @r#"
528        Response {
529            status: 200,
530            version: HTTP/1.1,
531            headers: {},
532            body: Response {
533                label: None,
534                data: Some(
535                    Object({
536                        "hello": String(
537                            "world",
538                        ),
539                        "hello2": String(
540                            "world",
541                        ),
542                    }),
543                ),
544                path: None,
545                errors: [],
546                extensions: {},
547                has_next: None,
548                subscribed: None,
549                created_at: None,
550                incremental: [],
551            },
552        }
553        "#);
554    }
555
556    #[tokio::test]
557    async fn test_handle_responses_entities() {
558        let connector = Arc::new(Connector {
559            spec: ConnectSpec::V0_1,
560            schema_subtypes_map: Default::default(),
561            id: ConnectId::new(
562                "subgraph_name".into(),
563                None,
564                name!(Query),
565                name!(user),
566                None,
567                0,
568            ),
569            transport: Some(HttpJsonTransport {
570                source_template: "http://localhost/api".parse().ok(),
571                connect_template: "/path".parse().unwrap(),
572                ..Default::default()
573            }),
574            selection: JSONSelection::parse("$.data { id }").unwrap(),
575            entity_resolver: Some(EntityResolver::Explicit),
576            config: Default::default(),
577            max_requests: None,
578            batch_settings: None,
579            request_headers: Default::default(),
580            response_headers: Default::default(),
581            request_variable_keys: Default::default(),
582            response_variable_keys: Default::default(),
583            error_settings: Default::default(),
584            label: "test label".into(),
585        });
586
587        let response1: http::Response<RouterBody> = http::Response::builder()
588            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
589            .unwrap();
590        let response_key1 = ResponseKey::Entity {
591            index: 0,
592            inputs: Default::default(),
593            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
594        };
595
596        let response2 = http::Response::builder()
597            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
598            .unwrap();
599        let response_key2 = ResponseKey::Entity {
600            index: 1,
601            inputs: Default::default(),
602            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
603        };
604
605        let supergraph_request = Arc::new(
606            http::Request::builder()
607                .body(graphql::Request::builder().build())
608                .unwrap(),
609        );
610
611        let res = super::aggregate_responses(
612            vec![
613                process_response(
614                    Ok(response1),
615                    response_key1,
616                    connector.clone(),
617                    &Context::default(),
618                    (None, Default::default()),
619                    None,
620                    supergraph_request.clone(),
621                    Default::default(),
622                )
623                .await
624                .mapped_response,
625                process_response(
626                    Ok(response2),
627                    response_key2,
628                    connector,
629                    &Context::default(),
630                    (None, Default::default()),
631                    None,
632                    supergraph_request,
633                    Default::default(),
634                )
635                .await
636                .mapped_response,
637            ],
638            Context::new(),
639        )
640        .unwrap();
641
642        assert_debug_snapshot!(res.response, @r#"
643        Response {
644            status: 200,
645            version: HTTP/1.1,
646            headers: {},
647            body: Response {
648                label: None,
649                data: Some(
650                    Object({
651                        "_entities": Array([
652                            Object({
653                                "id": String(
654                                    "1",
655                                ),
656                            }),
657                            Object({
658                                "id": String(
659                                    "2",
660                                ),
661                            }),
662                        ]),
663                    }),
664                ),
665                path: None,
666                errors: [],
667                extensions: {},
668                has_next: None,
669                subscribed: None,
670                created_at: None,
671                incremental: [],
672            },
673        }
674        "#);
675    }
676
677    #[tokio::test]
678    async fn test_handle_responses_batch() {
679        let connector = Arc::new(Connector {
680            spec: ConnectSpec::V0_2,
681            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
682            schema_subtypes_map: Default::default(),
683            transport: Some(HttpJsonTransport {
684                source_template: "http://localhost/api".parse().ok(),
685                connect_template: "/path".parse().unwrap(),
686                method: HTTPMethod::Post,
687                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
688                ..Default::default()
689            }),
690            selection: JSONSelection::parse("$.data { id name }").unwrap(),
691            entity_resolver: Some(EntityResolver::TypeBatch),
692            config: Default::default(),
693            max_requests: None,
694            batch_settings: None,
695            request_headers: Default::default(),
696            response_headers: Default::default(),
697            request_variable_keys: Default::default(),
698            response_variable_keys: Default::default(),
699            error_settings: Default::default(),
700            label: "test label".into(),
701        });
702
703        let keys = connector
704            .resolvable_key(
705                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
706                    .unwrap(),
707            )
708            .unwrap()
709            .unwrap();
710
711        let response1: http::Response<RouterBody> = http::Response::builder()
712            // different order from the request inputs
713            .body(router::body::from_bytes(
714                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
715            ))
716            .unwrap();
717
718        let mut inputs: RequestInputs = RequestInputs::default();
719        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
720        inputs.batch = representations
721            .as_array()
722            .unwrap()
723            .iter()
724            .map(|v| v.as_object().unwrap().clone())
725            .collect_vec();
726
727        let response_key1 = ResponseKey::BatchEntity {
728            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
729            keys,
730            inputs,
731        };
732
733        let supergraph_request = Arc::new(
734            http::Request::builder()
735                .body(graphql::Request::builder().build())
736                .unwrap(),
737        );
738
739        let res = super::aggregate_responses(
740            vec![
741                process_response(
742                    Ok(response1),
743                    response_key1,
744                    connector.clone(),
745                    &Context::default(),
746                    (None, Default::default()),
747                    None,
748                    supergraph_request,
749                    Default::default(),
750                )
751                .await
752                .mapped_response,
753            ],
754            Context::new(),
755        )
756        .unwrap();
757
758        assert_debug_snapshot!(res.response, @r#"
759        Response {
760            status: 200,
761            version: HTTP/1.1,
762            headers: {},
763            body: Response {
764                label: None,
765                data: Some(
766                    Object({
767                        "_entities": Array([
768                            Object({
769                                "id": String(
770                                    "1",
771                                ),
772                                "name": String(
773                                    "A",
774                                ),
775                            }),
776                            Object({
777                                "id": String(
778                                    "2",
779                                ),
780                                "name": String(
781                                    "B",
782                                ),
783                            }),
784                        ]),
785                    }),
786                ),
787                path: None,
788                errors: [],
789                extensions: {},
790                has_next: None,
791                subscribed: None,
792                created_at: None,
793                incremental: [],
794            },
795        }
796        "#);
797    }
798
799    #[tokio::test]
800    async fn test_handle_responses_entity_field() {
801        let connector = Arc::new(Connector {
802            spec: ConnectSpec::V0_1,
803            schema_subtypes_map: Default::default(),
804            id: ConnectId::new(
805                "subgraph_name".into(),
806                None,
807                name!(User),
808                name!(field),
809                None,
810                0,
811            ),
812            transport: Some(HttpJsonTransport {
813                source_template: "http://localhost/api".parse().ok(),
814                connect_template: "/path".parse().unwrap(),
815                ..Default::default()
816            }),
817            selection: JSONSelection::parse("$.data").unwrap(),
818            entity_resolver: Some(EntityResolver::Implicit),
819            config: Default::default(),
820            max_requests: None,
821            batch_settings: None,
822            request_headers: Default::default(),
823            response_headers: Default::default(),
824            request_variable_keys: Default::default(),
825            response_variable_keys: Default::default(),
826            error_settings: Default::default(),
827            label: "test label".into(),
828        });
829
830        let response1: http::Response<RouterBody> = http::Response::builder()
831            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
832            .unwrap();
833        let response_key1 = ResponseKey::EntityField {
834            index: 0,
835            inputs: Default::default(),
836            field_name: "field".to_string(),
837            typename: Some(name!("User")),
838            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
839        };
840
841        let response2 = http::Response::builder()
842            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
843            .unwrap();
844        let response_key2 = ResponseKey::EntityField {
845            index: 1,
846            inputs: Default::default(),
847            field_name: "field".to_string(),
848            typename: Some(name!("User")),
849            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
850        };
851
852        let supergraph_request = Arc::new(
853            http::Request::builder()
854                .body(graphql::Request::builder().build())
855                .unwrap(),
856        );
857
858        let res = super::aggregate_responses(
859            vec![
860                process_response(
861                    Ok(response1),
862                    response_key1,
863                    connector.clone(),
864                    &Context::default(),
865                    (None, Default::default()),
866                    None,
867                    supergraph_request.clone(),
868                    Default::default(),
869                )
870                .await
871                .mapped_response,
872                process_response(
873                    Ok(response2),
874                    response_key2,
875                    connector,
876                    &Context::default(),
877                    (None, Default::default()),
878                    None,
879                    supergraph_request,
880                    Default::default(),
881                )
882                .await
883                .mapped_response,
884            ],
885            Context::new(),
886        )
887        .unwrap();
888
889        assert_debug_snapshot!(res.response, @r#"
890        Response {
891            status: 200,
892            version: HTTP/1.1,
893            headers: {},
894            body: Response {
895                label: None,
896                data: Some(
897                    Object({
898                        "_entities": Array([
899                            Object({
900                                "__typename": String(
901                                    "User",
902                                ),
903                                "field": String(
904                                    "value1",
905                                ),
906                            }),
907                            Object({
908                                "__typename": String(
909                                    "User",
910                                ),
911                                "field": String(
912                                    "value2",
913                                ),
914                            }),
915                        ]),
916                    }),
917                ),
918                path: None,
919                errors: [],
920                extensions: {},
921                has_next: None,
922                subscribed: None,
923                created_at: None,
924                incremental: [],
925            },
926        }
927        "#);
928    }
929
930    #[tokio::test]
931    async fn test_handle_responses_errors() {
932        let connector = Arc::new(Connector {
933            spec: ConnectSpec::V0_1,
934            schema_subtypes_map: Default::default(),
935            id: ConnectId::new(
936                "subgraph_name".into(),
937                None,
938                name!(Query),
939                name!(user),
940                None,
941                0,
942            ),
943            transport: Some(HttpJsonTransport {
944                source_template: "http://localhost/api".parse().ok(),
945                connect_template: "/path".parse().unwrap(),
946                ..Default::default()
947            }),
948            selection: JSONSelection::parse("$.data").unwrap(),
949            entity_resolver: Some(EntityResolver::Explicit),
950            config: Default::default(),
951            max_requests: None,
952            batch_settings: None,
953            request_headers: Default::default(),
954            response_headers: Default::default(),
955            request_variable_keys: Default::default(),
956            response_variable_keys: Default::default(),
957            error_settings: Default::default(),
958            label: "test label".into(),
959        });
960
961        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
962            .body(router::body::from_bytes(r#"plain text"#))
963            .unwrap();
964        let response_key_plaintext = ResponseKey::Entity {
965            index: 0,
966            inputs: Default::default(),
967            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
968        };
969
970        let response1: http::Response<RouterBody> = http::Response::builder()
971            .status(404)
972            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
973            .unwrap();
974        let response_key1 = ResponseKey::Entity {
975            index: 1,
976            inputs: Default::default(),
977            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
978        };
979
980        let response2 = http::Response::builder()
981            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
982            .unwrap();
983        let response_key2 = ResponseKey::Entity {
984            index: 2,
985            inputs: Default::default(),
986            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
987        };
988
989        let response3: http::Response<RouterBody> = http::Response::builder()
990            .status(500)
991            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
992            .unwrap();
993        let response_key3 = ResponseKey::Entity {
994            index: 3,
995            inputs: Default::default(),
996            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
997        };
998
999        let supergraph_request = Arc::new(
1000            http::Request::builder()
1001                .body(graphql::Request::builder().build())
1002                .unwrap(),
1003        );
1004
1005        let mut res = super::aggregate_responses(
1006            vec![
1007                process_response(
1008                    Ok(response_plaintext),
1009                    response_key_plaintext,
1010                    connector.clone(),
1011                    &Context::default(),
1012                    (None, Default::default()),
1013                    None,
1014                    supergraph_request.clone(),
1015                    Default::default(),
1016                )
1017                .await
1018                .mapped_response,
1019                process_response(
1020                    Ok(response1),
1021                    response_key1,
1022                    connector.clone(),
1023                    &Context::default(),
1024                    (None, Default::default()),
1025                    None,
1026                    supergraph_request.clone(),
1027                    Default::default(),
1028                )
1029                .await
1030                .mapped_response,
1031                process_response(
1032                    Ok(response2),
1033                    response_key2,
1034                    connector.clone(),
1035                    &Context::default(),
1036                    (None, Default::default()),
1037                    None,
1038                    supergraph_request.clone(),
1039                    Default::default(),
1040                )
1041                .await
1042                .mapped_response,
1043                process_response(
1044                    Ok(response3),
1045                    response_key3,
1046                    connector,
1047                    &Context::default(),
1048                    (None, Default::default()),
1049                    None,
1050                    supergraph_request,
1051                    Default::default(),
1052                )
1053                .await
1054                .mapped_response,
1055            ],
1056            Context::new(),
1057        )
1058        .unwrap();
1059
1060        // Overwrite error IDs to avoid random Uuid mismatch.
1061        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
1062        // we have to do it manually.
1063        let body = res.response.body_mut();
1064        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1065
1066        assert_debug_snapshot!(res.response, @r#"
1067        Response {
1068            status: 200,
1069            version: HTTP/1.1,
1070            headers: {},
1071            body: Response {
1072                label: None,
1073                data: Some(
1074                    Object({
1075                        "_entities": Array([
1076                            Null,
1077                            Null,
1078                            Object({
1079                                "id": String(
1080                                    "2",
1081                                ),
1082                            }),
1083                            Null,
1084                        ]),
1085                    }),
1086                ),
1087                path: None,
1088                errors: [
1089                    Error {
1090                        message: "The server returned data in an unexpected format.",
1091                        locations: [],
1092                        path: Some(
1093                            Path(
1094                                [
1095                                    Key(
1096                                        "_entities",
1097                                        None,
1098                                    ),
1099                                    Index(
1100                                        0,
1101                                    ),
1102                                ],
1103                            ),
1104                        ),
1105                        extensions: {
1106                            "code": String(
1107                                "CONNECTOR_RESPONSE_INVALID",
1108                            ),
1109                            "service": String(
1110                                "subgraph_name",
1111                            ),
1112                            "connector": Object({
1113                                "coordinate": String(
1114                                    "subgraph_name:Query.user[0]",
1115                                ),
1116                            }),
1117                            "http": Object({
1118                                "status": Number(200),
1119                            }),
1120                            "apollo.private.subgraph.name": String(
1121                                "subgraph_name",
1122                            ),
1123                        },
1124                        apollo_id: 00000000-0000-0000-0000-000000000000,
1125                        span_event_emitted: true,
1126                    },
1127                    Error {
1128                        message: "Request failed",
1129                        locations: [],
1130                        path: Some(
1131                            Path(
1132                                [
1133                                    Key(
1134                                        "_entities",
1135                                        None,
1136                                    ),
1137                                    Index(
1138                                        1,
1139                                    ),
1140                                ],
1141                            ),
1142                        ),
1143                        extensions: {
1144                            "code": String(
1145                                "CONNECTOR_FETCH",
1146                            ),
1147                            "service": String(
1148                                "subgraph_name",
1149                            ),
1150                            "connector": Object({
1151                                "coordinate": String(
1152                                    "subgraph_name:Query.user[0]",
1153                                ),
1154                            }),
1155                            "http": Object({
1156                                "status": Number(404),
1157                            }),
1158                            "apollo.private.subgraph.name": String(
1159                                "subgraph_name",
1160                            ),
1161                        },
1162                        apollo_id: 00000000-0000-0000-0000-000000000000,
1163                        span_event_emitted: true,
1164                    },
1165                    Error {
1166                        message: "Request failed",
1167                        locations: [],
1168                        path: Some(
1169                            Path(
1170                                [
1171                                    Key(
1172                                        "_entities",
1173                                        None,
1174                                    ),
1175                                    Index(
1176                                        3,
1177                                    ),
1178                                ],
1179                            ),
1180                        ),
1181                        extensions: {
1182                            "code": String(
1183                                "CONNECTOR_FETCH",
1184                            ),
1185                            "service": String(
1186                                "subgraph_name",
1187                            ),
1188                            "connector": Object({
1189                                "coordinate": String(
1190                                    "subgraph_name:Query.user[0]",
1191                                ),
1192                            }),
1193                            "http": Object({
1194                                "status": Number(500),
1195                            }),
1196                            "apollo.private.subgraph.name": String(
1197                                "subgraph_name",
1198                            ),
1199                        },
1200                        apollo_id: 00000000-0000-0000-0000-000000000000,
1201                        span_event_emitted: true,
1202                    },
1203                ],
1204                extensions: {},
1205                has_next: None,
1206                subscribed: None,
1207                created_at: None,
1208                incremental: [],
1209            },
1210        }
1211        "#);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_handle_responses_status() {
1216        let selection = JSONSelection::parse("$status").unwrap();
1217        let connector = Arc::new(Connector {
1218            spec: ConnectSpec::V0_1,
1219            schema_subtypes_map: Default::default(),
1220            id: ConnectId::new(
1221                "subgraph_name".into(),
1222                None,
1223                name!(Query),
1224                name!(hello),
1225                None,
1226                0,
1227            ),
1228            transport: Some(HttpJsonTransport {
1229                source_template: "http://localhost/api".parse().ok(),
1230                connect_template: "/path".parse().unwrap(),
1231                ..Default::default()
1232            }),
1233            selection: selection.clone(),
1234            entity_resolver: None,
1235            config: Default::default(),
1236            max_requests: None,
1237            batch_settings: None,
1238            request_headers: Default::default(),
1239            response_headers: Default::default(),
1240            request_variable_keys: Default::default(),
1241            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1242            error_settings: Default::default(),
1243            label: "test label".into(),
1244        });
1245
1246        let response1: http::Response<RouterBody> = http::Response::builder()
1247            .status(201)
1248            .body(router::body::from_bytes(r#"{}"#))
1249            .unwrap();
1250        let response_key1 = ResponseKey::RootField {
1251            name: "hello".to_string(),
1252            inputs: Default::default(),
1253            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1254        };
1255
1256        let supergraph_request = Arc::new(
1257            http::Request::builder()
1258                .body(graphql::Request::builder().build())
1259                .unwrap(),
1260        );
1261
1262        let res = super::aggregate_responses(
1263            vec![
1264                process_response(
1265                    Ok(response1),
1266                    response_key1,
1267                    connector,
1268                    &Context::default(),
1269                    (None, Default::default()),
1270                    None,
1271                    supergraph_request,
1272                    Default::default(),
1273                )
1274                .await
1275                .mapped_response,
1276            ],
1277            Context::new(),
1278        )
1279        .unwrap();
1280
1281        assert_debug_snapshot!(res.response, @r#"
1282        Response {
1283            status: 200,
1284            version: HTTP/1.1,
1285            headers: {},
1286            body: Response {
1287                label: None,
1288                data: Some(
1289                    Object({
1290                        "hello": Number(201),
1291                    }),
1292                ),
1293                path: None,
1294                errors: [],
1295                extensions: {},
1296                has_next: None,
1297                subscribed: None,
1298                created_at: None,
1299                incremental: [],
1300            },
1301        }
1302        "#);
1303    }
1304
1305    #[tokio::test]
1306    async fn test_handle_response_with_is_success() {
1307        let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1308        let selection = JSONSelection::parse("$status").unwrap();
1309        let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1310            message: Default::default(),
1311            source_extensions: Default::default(),
1312            connect_extensions: Default::default(),
1313            connect_is_success: Some(is_success.clone()),
1314        };
1315        let connector = Arc::new(Connector {
1316            spec: ConnectSpec::V0_1,
1317            schema_subtypes_map: Default::default(),
1318            id: ConnectId::new(
1319                "subgraph_name".into(),
1320                None,
1321                name!(Query),
1322                name!(hello),
1323                None,
1324                0,
1325            ),
1326            transport: Some(HttpJsonTransport {
1327                source_template: "http://localhost/api".parse().ok(),
1328                connect_template: "/path".parse().unwrap(),
1329                ..Default::default()
1330            }),
1331            selection: selection.clone(),
1332            entity_resolver: None,
1333            config: Default::default(),
1334            max_requests: None,
1335            batch_settings: None,
1336            request_headers: Default::default(),
1337            response_headers: Default::default(),
1338            request_variable_keys: Default::default(),
1339            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1340            error_settings,
1341            label: Label::from("test label"),
1342        });
1343
1344        // First request should be marked as error as status is NOT 400
1345        let response_fail: http::Response<RouterBody> = http::Response::builder()
1346            .status(201)
1347            .body(router::body::from_bytes(r#"{}"#))
1348            .unwrap();
1349        let response_fail_key = ResponseKey::RootField {
1350            name: "hello".to_string(),
1351            inputs: Default::default(),
1352            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1353        };
1354
1355        // Second response should be marked as a success as the status is 400!
1356        let response_succeed: http::Response<RouterBody> = http::Response::builder()
1357            .status(400)
1358            .body(router::body::from_bytes(r#"{}"#))
1359            .unwrap();
1360        let response_succeed_key = ResponseKey::RootField {
1361            name: "hello".to_string(),
1362            inputs: Default::default(),
1363            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1364        };
1365
1366        let supergraph_request = Arc::new(
1367            http::Request::builder()
1368                .body(graphql::Request::builder().build())
1369                .unwrap(),
1370        );
1371
1372        // Make failing request
1373        let res_expect_fail = super::aggregate_responses(
1374            vec![
1375                process_response(
1376                    Ok(response_fail),
1377                    response_fail_key,
1378                    connector.clone(),
1379                    &Context::default(),
1380                    (None, Default::default()),
1381                    None,
1382                    supergraph_request.clone(),
1383                    Default::default(),
1384                )
1385                .await
1386                .mapped_response,
1387            ],
1388            Context::new(),
1389        )
1390        .unwrap()
1391        .response;
1392        assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1393        assert_eq!(res_expect_fail.body().errors.len(), 1);
1394
1395        // Make succeeding request
1396        let res_expect_success = super::aggregate_responses(
1397            vec![
1398                process_response(
1399                    Ok(response_succeed),
1400                    response_succeed_key,
1401                    connector.clone(),
1402                    &Context::default(),
1403                    (None, Default::default()),
1404                    None,
1405                    supergraph_request.clone(),
1406                    Default::default(),
1407                )
1408                .await
1409                .mapped_response,
1410            ],
1411            Context::new(),
1412        )
1413        .unwrap()
1414        .response;
1415        assert!(res_expect_success.body().errors.is_empty());
1416        assert_eq!(
1417            &res_expect_success.body().data,
1418            &Some(json!({"hello": json!(400)}))
1419        );
1420    }
1421
1422    fn make_connector() -> Arc<Connector> {
1423        Arc::new(Connector {
1424            spec: ConnectSpec::V0_1,
1425            schema_subtypes_map: Default::default(),
1426            id: ConnectId::new(
1427                "subgraph_name".into(),
1428                None,
1429                name!(Query),
1430                name!(hello),
1431                None,
1432                0,
1433            ),
1434            transport: Some(HttpJsonTransport {
1435                source_template: "http://localhost/api".parse().ok(),
1436                connect_template: "/path".parse().unwrap(),
1437                ..Default::default()
1438            }),
1439            selection: JSONSelection::parse("$.data").unwrap(),
1440            entity_resolver: None,
1441            config: Default::default(),
1442            max_requests: None,
1443            batch_settings: None,
1444            request_headers: Default::default(),
1445            response_headers: Default::default(),
1446            request_variable_keys: Default::default(),
1447            response_variable_keys: Default::default(),
1448            error_settings: Default::default(),
1449            label: "test label".into(),
1450        })
1451    }
1452
1453    fn make_supergraph_request() -> Arc<http::Request<graphql::Request>> {
1454        Arc::new(
1455            http::Request::builder()
1456                .body(graphql::Request::builder().build())
1457                .unwrap(),
1458        )
1459    }
1460
1461    #[tokio::test]
1462    async fn process_response_under_size_limit() {
1463        use crate::plugins::limits::ConnectorResponseSizeLimit;
1464
1465        let ctx = Context::new();
1466        ctx.extensions()
1467            .with_lock(|e| e.insert(ConnectorResponseSizeLimit(1000)));
1468
1469        let key = ResponseKey::RootField {
1470            name: "hello".to_string(),
1471            inputs: Default::default(),
1472            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1473        };
1474        let response = http::Response::builder()
1475            .body(router::body::from_bytes(r#"{"data":"world"}"#))
1476            .unwrap();
1477
1478        let result = process_response(
1479            Ok(response),
1480            key,
1481            make_connector(),
1482            &ctx,
1483            (None, Default::default()),
1484            None,
1485            make_supergraph_request(),
1486            Default::default(),
1487        )
1488        .await;
1489
1490        let graphql_response =
1491            super::aggregate_responses(vec![result.mapped_response], Context::new())
1492                .unwrap()
1493                .response;
1494        assert!(
1495            graphql_response.body().errors.is_empty(),
1496            "expected no errors when response is under the limit"
1497        );
1498    }
1499
1500    #[tokio::test]
1501    async fn process_response_exceeds_size_limit() {
1502        use crate::plugins::limits::ConnectorResponseSizeLimit;
1503
1504        let ctx = Context::new();
1505        // Limit of 5 bytes — well under the response body size
1506        ctx.extensions()
1507            .with_lock(|e| e.insert(ConnectorResponseSizeLimit(5)));
1508
1509        let key = ResponseKey::RootField {
1510            name: "hello".to_string(),
1511            inputs: Default::default(),
1512            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1513        };
1514        let response = http::Response::builder()
1515            .body(router::body::from_bytes(r#"{"data":"world"}"#))
1516            .unwrap();
1517
1518        let result = process_response(
1519            Ok(response),
1520            key,
1521            make_connector(),
1522            &ctx,
1523            (None, Default::default()),
1524            None,
1525            make_supergraph_request(),
1526            Default::default(),
1527        )
1528        .await;
1529
1530        let graphql_response =
1531            super::aggregate_responses(vec![result.mapped_response], Context::new())
1532                .unwrap()
1533                .response;
1534        let errors = &graphql_response.body().errors;
1535        assert!(!errors.is_empty(), "expected an error for exceeded limit");
1536        assert!(
1537            errors[0].message.contains("exceeded limit of 5 bytes"),
1538            "unexpected error message: {}",
1539            errors[0].message
1540        );
1541    }
1542
1543    // Reproduction for CNN-1095: when `isSuccess` returns false and the user has
1544    // configured `errors.message` and `errors.extensions`, the resulting GraphQL
1545    // error should use the mapped values (sourced from the response body) and
1546    // still expose the default `http.status` alongside them.
1547    //
1548    // Per the public docs at
1549    // https://www.apollographql.com/docs/graphos/connectors/responses/error-handling,
1550    // the `errors.message` mapping expression yields the error message and
1551    // `errors.extensions` is merged into `extensions` (overriding defaults like
1552    // `code` when keys collide, preserving defaults like `http.status` when they
1553    // don't).
1554    #[tokio::test]
1555    async fn errors_as_data_maps_message_and_extensions_when_is_success_false() {
1556        let connector = Arc::new(Connector {
1557            spec: ConnectSpec::V0_2,
1558            schema_subtypes_map: Default::default(),
1559            id: ConnectId::new(
1560                "subgraph_name".into(),
1561                None,
1562                name!(Query),
1563                name!(hello),
1564                None,
1565                0,
1566            ),
1567            transport: Some(HttpJsonTransport {
1568                source_template: "http://localhost/api".parse().ok(),
1569                connect_template: "/path".parse().unwrap(),
1570                ..Default::default()
1571            }),
1572            selection: JSONSelection::parse("$.data").unwrap(),
1573            entity_resolver: None,
1574            config: Default::default(),
1575            max_requests: None,
1576            batch_settings: None,
1577            request_headers: Default::default(),
1578            response_headers: Default::default(),
1579            request_variable_keys: Default::default(),
1580            response_variable_keys: Default::default(),
1581            error_settings: ConnectorErrorsSettings {
1582                message: Some(JSONSelection::parse("error.message").unwrap()),
1583                connect_extensions: Some(
1584                    JSONSelection::parse("code: error.code\nhint: error.hint").unwrap(),
1585                ),
1586                source_extensions: None,
1587                connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1588            },
1589            label: "test label".into(),
1590        });
1591
1592        let response: http::Response<RouterBody> = http::Response::builder()
1593            .status(500)
1594            .body(router::body::from_bytes(
1595                r#"{"error":{"message":"no good","code":"BAD_THING","hint":"try again"}}"#,
1596            ))
1597            .unwrap();
1598        let response_key = ResponseKey::RootField {
1599            name: "hello".to_string(),
1600            inputs: Default::default(),
1601            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1602        };
1603
1604        let supergraph_request = Arc::new(
1605            http::Request::builder()
1606                .body(graphql::Request::builder().build())
1607                .unwrap(),
1608        );
1609
1610        let result = super::aggregate_responses(
1611            vec![
1612                process_response(
1613                    Ok(response),
1614                    response_key,
1615                    connector,
1616                    &Context::default(),
1617                    (None, Default::default()),
1618                    None,
1619                    supergraph_request,
1620                    Default::default(),
1621                )
1622                .await
1623                .mapped_response,
1624            ],
1625            Context::new(),
1626        )
1627        .unwrap();
1628
1629        let errors = &result.response.body().errors;
1630        assert_eq!(
1631            errors.len(),
1632            1,
1633            "expected exactly one error, got: {errors:?}"
1634        );
1635        let error = &errors[0];
1636
1637        assert_eq!(
1638            error.message, "no good",
1639            "errors.message should be mapped from the response body"
1640        );
1641
1642        let code = error
1643            .extensions
1644            .get("code")
1645            .and_then(|v| v.as_str())
1646            .unwrap_or_default();
1647        assert_eq!(
1648            code, "BAD_THING",
1649            "errors.extensions.code should override default CONNECTOR_FETCH"
1650        );
1651
1652        let hint = error
1653            .extensions
1654            .get("hint")
1655            .and_then(|v| v.as_str())
1656            .unwrap_or_default();
1657        assert_eq!(
1658            hint, "try again",
1659            "errors.extensions.hint should be mapped from the response body"
1660        );
1661
1662        let http_status = error
1663            .extensions
1664            .get("http")
1665            .and_then(|v| v.as_object())
1666            .and_then(|m| m.get("status"))
1667            .and_then(|v| v.as_i64());
1668        assert_eq!(
1669            http_status,
1670            Some(500),
1671            "default extensions.http.status should be preserved alongside the mapped extensions"
1672        );
1673    }
1674
1675    // Reproduction for CNN-1095: when `errors.extensions` writes a nested key
1676    // that collides with a default extension (e.g. `http`), the public docs at
1677    // https://www.apollographql.com/docs/graphos/connectors/responses/error-handling
1678    // say the user-supplied values should be merged into the default object
1679    // (so `extensions.http.status` is preserved alongside `extensions.http.myField`).
1680    //
1681    // The current implementation in `runtime/responses.rs::map_error` calls
1682    // `error.extension("http", user_value)` after the default `http: { status }`
1683    // is set, which replaces the entire `http` object — so `status` is lost.
1684    #[tokio::test]
1685    async fn errors_as_data_deep_merges_nested_extensions_with_defaults() {
1686        let connector = Arc::new(Connector {
1687            spec: ConnectSpec::V0_2,
1688            schema_subtypes_map: Default::default(),
1689            id: ConnectId::new(
1690                "subgraph_name".into(),
1691                None,
1692                name!(Query),
1693                name!(hello),
1694                None,
1695                0,
1696            ),
1697            transport: Some(HttpJsonTransport {
1698                source_template: "http://localhost/api".parse().ok(),
1699                connect_template: "/path".parse().unwrap(),
1700                ..Default::default()
1701            }),
1702            selection: JSONSelection::parse("$.data").unwrap(),
1703            entity_resolver: None,
1704            config: Default::default(),
1705            max_requests: None,
1706            batch_settings: None,
1707            request_headers: Default::default(),
1708            response_headers: Default::default(),
1709            request_variable_keys: Default::default(),
1710            response_variable_keys: Default::default(),
1711            error_settings: ConnectorErrorsSettings {
1712                message: None,
1713                connect_extensions: Some(
1714                    JSONSelection::parse("http: { myField: $(\"literal Value\") }").unwrap(),
1715                ),
1716                source_extensions: None,
1717                connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1718            },
1719            label: "test label".into(),
1720        });
1721
1722        let response: http::Response<RouterBody> = http::Response::builder()
1723            .status(500)
1724            .body(router::body::from_bytes(r#"{}"#))
1725            .unwrap();
1726        let response_key = ResponseKey::RootField {
1727            name: "hello".to_string(),
1728            inputs: Default::default(),
1729            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1730        };
1731
1732        let supergraph_request = Arc::new(
1733            http::Request::builder()
1734                .body(graphql::Request::builder().build())
1735                .unwrap(),
1736        );
1737
1738        let result = super::aggregate_responses(
1739            vec![
1740                process_response(
1741                    Ok(response),
1742                    response_key,
1743                    connector,
1744                    &Context::default(),
1745                    (None, Default::default()),
1746                    None,
1747                    supergraph_request,
1748                    Default::default(),
1749                )
1750                .await
1751                .mapped_response,
1752            ],
1753            Context::new(),
1754        )
1755        .unwrap();
1756
1757        let errors = &result.response.body().errors;
1758        assert_eq!(
1759            errors.len(),
1760            1,
1761            "expected exactly one error, got: {errors:?}"
1762        );
1763        let http = errors[0]
1764            .extensions
1765            .get("http")
1766            .and_then(|v| v.as_object())
1767            .expect("extensions.http should be an object");
1768
1769        assert_eq!(
1770            http.get("myField").and_then(|v| v.as_str()),
1771            Some("literal Value"),
1772            "user-supplied extensions.http.myField should appear in the response"
1773        );
1774        assert_eq!(
1775            http.get("status").and_then(|v| v.as_i64()),
1776            Some(500),
1777            "default extensions.http.status should be preserved when the user sets sibling keys under extensions.http"
1778        );
1779    }
1780
1781    // Covers the nested-collision case across all three contributors to
1782    // `extensions`: the default (`http: { status }`), the source-level
1783    // `errors.extensions` mapping, and the connect-level `errors.extensions`
1784    // mapping. With deep-merge, sibling keys under a shared nested object
1785    // (`http`) from each layer should all survive — last-writer-wins only at
1786    // a leaf collision, not at the parent object level.
1787    #[tokio::test]
1788    async fn errors_as_data_deep_merges_nested_extensions_across_source_and_connect() {
1789        let connector = Arc::new(Connector {
1790            spec: ConnectSpec::V0_2,
1791            schema_subtypes_map: Default::default(),
1792            id: ConnectId::new(
1793                "subgraph_name".into(),
1794                None,
1795                name!(Query),
1796                name!(hello),
1797                None,
1798                0,
1799            ),
1800            transport: Some(HttpJsonTransport {
1801                source_template: "http://localhost/api".parse().ok(),
1802                connect_template: "/path".parse().unwrap(),
1803                ..Default::default()
1804            }),
1805            selection: JSONSelection::parse("$.data").unwrap(),
1806            entity_resolver: None,
1807            config: Default::default(),
1808            max_requests: None,
1809            batch_settings: None,
1810            request_headers: Default::default(),
1811            response_headers: Default::default(),
1812            request_variable_keys: Default::default(),
1813            response_variable_keys: Default::default(),
1814            error_settings: ConnectorErrorsSettings {
1815                message: None,
1816                source_extensions: Some(
1817                    JSONSelection::parse("http: { fromSource: $(\"a\") }").unwrap(),
1818                ),
1819                connect_extensions: Some(
1820                    JSONSelection::parse("http: { fromConnect: $(\"b\") }").unwrap(),
1821                ),
1822                connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1823            },
1824            label: "test label".into(),
1825        });
1826
1827        let response: http::Response<RouterBody> = http::Response::builder()
1828            .status(500)
1829            .body(router::body::from_bytes(r#"{}"#))
1830            .unwrap();
1831        let response_key = ResponseKey::RootField {
1832            name: "hello".to_string(),
1833            inputs: Default::default(),
1834            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1835        };
1836
1837        let supergraph_request = Arc::new(
1838            http::Request::builder()
1839                .body(graphql::Request::builder().build())
1840                .unwrap(),
1841        );
1842
1843        let result = super::aggregate_responses(
1844            vec![
1845                process_response(
1846                    Ok(response),
1847                    response_key,
1848                    connector,
1849                    &Context::default(),
1850                    (None, Default::default()),
1851                    None,
1852                    supergraph_request,
1853                    Default::default(),
1854                )
1855                .await
1856                .mapped_response,
1857            ],
1858            Context::new(),
1859        )
1860        .unwrap();
1861
1862        let errors = &result.response.body().errors;
1863        assert_eq!(
1864            errors.len(),
1865            1,
1866            "expected exactly one error, got: {errors:?}"
1867        );
1868        let http = errors[0]
1869            .extensions
1870            .get("http")
1871            .and_then(|v| v.as_object())
1872            .expect("extensions.http should be an object");
1873
1874        assert_eq!(
1875            http.get("status").and_then(|v| v.as_i64()),
1876            Some(500),
1877            "default extensions.http.status should be preserved alongside source- and connect-supplied siblings"
1878        );
1879        assert_eq!(
1880            http.get("fromSource").and_then(|v| v.as_str()),
1881            Some("a"),
1882            "source_extensions sibling under extensions.http should survive the connect_extensions merge"
1883        );
1884        assert_eq!(
1885            http.get("fromConnect").and_then(|v| v.as_str()),
1886            Some("b"),
1887            "connect_extensions sibling under extensions.http should appear alongside the source sibling"
1888        );
1889    }
1890}