apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_federation::connectors::Connector;
4use apollo_federation::connectors::ProblemLocation;
5use apollo_federation::connectors::runtime::debug::ConnectorContext;
6use apollo_federation::connectors::runtime::debug::ConnectorDebugHttpRequest;
7use apollo_federation::connectors::runtime::errors::Error;
8use apollo_federation::connectors::runtime::errors::RuntimeError;
9use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
10use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
11use apollo_federation::connectors::runtime::key::ResponseKey;
12use apollo_federation::connectors::runtime::mapping::Problem;
13use apollo_federation::connectors::runtime::responses::HandleResponseError;
14use apollo_federation::connectors::runtime::responses::MappedResponse;
15use apollo_federation::connectors::runtime::responses::RawResponse;
16use apollo_federation::connectors::runtime::responses::handle_raw_response;
17use axum::body::HttpBody;
18use encoding_rs::Encoding;
19use encoding_rs::UTF_8;
20use http::header::CONTENT_LENGTH;
21use http::header::CONTENT_TYPE;
22use mime::Mime;
23use opentelemetry::KeyValue;
24use parking_lot::Mutex;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44use crate::services::router;
45
46// --- ERRORS ------------------------------------------------------------------
47
48impl From<RuntimeError> for graphql::Error {
49    fn from(error: RuntimeError) -> Self {
50        let path: Path = (&error.path).into();
51
52        let err = graphql::Error::builder()
53            .message(&error.message)
54            .extensions(error.extensions())
55            .extension_code(error.code())
56            .path(path)
57            .build();
58
59        if let Some(subgraph_name) = &error.subgraph_name {
60            err.with_subgraph_name(subgraph_name)
61        } else {
62            err
63        }
64    }
65}
66
67// --- handle_responses --------------------------------------------------------
68
69pub(crate) async fn process_response<T: HttpBody>(
70    result: Result<http::Response<T>, Error>,
71    response_key: ResponseKey,
72    connector: Arc<Connector>,
73    context: &Context,
74    debug_request: (
75        Option<Box<ConnectorDebugHttpRequest>>,
76        Vec<(ProblemLocation, Problem)>,
77    ),
78    debug_context: &Option<Arc<Mutex<ConnectorContext>>>,
79    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
80) -> connector::request_service::Response {
81    let (mapped_response, result) = match result {
82        // This occurs when we short-circuit the request when over the limit
83        Err(error) => {
84            let raw = RawResponse::Error {
85                error: error.to_runtime_error(&connector, &response_key),
86                key: response_key,
87            };
88            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
89            (
90                raw.map_error(
91                    &connector,
92                    context,
93                    debug_context,
94                    supergraph_request.headers(),
95                ),
96                Err(error),
97            )
98        }
99        Ok(response) => {
100            let (parts, body) = response.into_parts();
101
102            let result = Ok(TransportResponse::Http(HttpResponse {
103                inner: parts.clone(),
104            }));
105
106            // If this errors, it will write to the debug context because it
107            // has access to the raw bytes, so we can't write to it again
108            // in any RawResponse::Error branches.
109            let raw = match deserialize_response(
110                body,
111                &parts,
112                connector.clone(),
113                context,
114                &response_key,
115                debug_context,
116                &debug_request,
117            )
118            .await
119            {
120                Ok(data) => RawResponse::Data {
121                    parts,
122                    data,
123                    key: response_key,
124                    debug_request,
125                },
126                Err(error) => RawResponse::Error {
127                    error,
128                    key: response_key,
129                },
130            };
131
132            let (mapped, is_success) = handle_raw_response(
133                raw,
134                &connector,
135                context,
136                debug_context,
137                supergraph_request.headers(),
138            );
139
140            if is_success {
141                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
142            } else {
143                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
144            };
145
146            (mapped, result)
147        }
148    };
149
150    if let MappedResponse::Error { ref error, .. } = mapped_response {
151        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
152    }
153
154    connector::request_service::Response {
155        context: context.clone(),
156        connector: connector.clone(),
157        transport_result: result,
158        mapped_response,
159    }
160}
161
162pub(crate) fn aggregate_responses(
163    responses: Vec<MappedResponse>,
164) -> Result<Response, HandleResponseError> {
165    let mut data = serde_json_bytes::Map::new();
166    let mut errors = Vec::new();
167    let count = responses.len();
168
169    for mapped in responses {
170        mapped.add_to_data(&mut data, &mut errors, count)?;
171    }
172
173    let data = if data.is_empty() {
174        Value::Null
175    } else {
176        Value::Object(data)
177    };
178
179    Span::current().record(
180        OTEL_STATUS_CODE,
181        if errors.is_empty() {
182            OTEL_STATUS_CODE_OK
183        } else {
184            OTEL_STATUS_CODE_ERROR
185        },
186    );
187
188    Ok(Response {
189        response: http::Response::builder()
190            .body(
191                graphql::Response::builder()
192                    .data(data)
193                    .errors(errors.into_iter().map(|e| e.into()).collect())
194                    .build(),
195            )
196            .unwrap(),
197    })
198}
199
200/// Converts the response body to bytes and deserializes it into a json Value.
201/// This is the last time we have access to the original bytes, so it's the only
202/// opportunity to write the invalid response to the debug context.
203async fn deserialize_response<T: HttpBody>(
204    body: T,
205    parts: &http::response::Parts,
206    connector: Arc<Connector>,
207    context: &Context,
208    response_key: &ResponseKey,
209    debug_context: &Option<Arc<Mutex<ConnectorContext>>>,
210    debug_request: &(
211        Option<Box<ConnectorDebugHttpRequest>>,
212        Vec<(ProblemLocation, Problem)>,
213    ),
214) -> Result<Value, RuntimeError> {
215    use serde_json_bytes::*;
216
217    let make_err = || {
218        let mut err = RuntimeError::new(
219            "The server returned data in an unexpected format.".to_string(),
220            response_key,
221        );
222        err.subgraph_name = Some(connector.id.subgraph_name.clone());
223        err = err.with_code("CONNECTOR_RESPONSE_INVALID");
224        err.coordinate = Some(connector.id.coordinate());
225        err = err.extension(
226            "http",
227            Value::Object(Map::from_iter([(
228                "status".into(),
229                Value::Number(parts.status.as_u16().into()),
230            )])),
231        );
232        err
233    };
234
235    let body = &router::body::into_bytes(body)
236        .await
237        .map_err(|_| make_err())?;
238
239    let log_response_level = context
240        .extensions()
241        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
242        .and_then(|event| {
243            // Create a temporary response here so we can evaluate the condition. This response
244            // is missing any information about the mapped response, because we don't have that
245            // yet. This means that we cannot correctly evaluate any condition that relies on
246            // the mapped response data or mapping problems. But we can't wait until we do have
247            // that information, because this is the only place we have the body bytes (without
248            // making an expensive clone of the body). So we either need to not expose any
249            // selector which can be used as a condition that requires mapping information, or
250            // we must document that such selectors cannot be used as conditions on standard
251            // connectors events.
252
253            let response = connector::request_service::Response {
254                context: context.clone(),
255                connector: connector.clone(),
256                transport_result: Ok(TransportResponse::Http(HttpResponse {
257                    inner: parts.clone(),
258                })),
259                mapped_response: MappedResponse::Data {
260                    data: Value::Null,
261                    key: response_key.clone(),
262                    problems: vec![],
263                },
264            };
265            if event.condition.evaluate_response(&response) {
266                Some(event.level)
267            } else {
268                None
269            }
270        });
271
272    if let Some(level) = log_response_level {
273        let mut attrs = Vec::with_capacity(4);
274        #[cfg(test)]
275        let headers = {
276            let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
277                .headers
278                .clone()
279                .into_iter()
280                .filter_map(|(name, val)| Some((name?.to_string(), val)))
281                .collect();
282            headers.sort_keys();
283            headers
284        };
285        #[cfg(not(test))]
286        let headers = &parts.headers;
287
288        attrs.push(KeyValue::new(
289            HTTP_RESPONSE_HEADERS,
290            opentelemetry::Value::String(format!("{:?}", headers).into()),
291        ));
292        attrs.push(KeyValue::new(
293            HTTP_RESPONSE_STATUS,
294            opentelemetry::Value::String(format!("{}", parts.status).into()),
295        ));
296        attrs.push(KeyValue::new(
297            HTTP_RESPONSE_VERSION,
298            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
299        ));
300        attrs.push(KeyValue::new(
301            HTTP_RESPONSE_BODY,
302            opentelemetry::Value::String(
303                String::from_utf8(body.clone().to_vec())
304                    .unwrap_or_default()
305                    .into(),
306            ),
307        ));
308
309        log_event(
310            level,
311            "connector.response",
312            attrs,
313            &format!("Response from connector {label:?}", label = connector.label),
314        );
315    }
316
317    // If the body is obviously empty, don't try to parse it
318    if let Some(content_length) = parts
319        .headers
320        .get(CONTENT_LENGTH)
321        .and_then(|len| len.to_str().ok())
322        .and_then(|s| s.parse::<usize>().ok())
323    {
324        if content_length == 0 {
325            return Ok(Value::Null);
326        }
327    }
328
329    let content_type = parts
330        .headers
331        .get(CONTENT_TYPE)
332        .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
333
334    if content_type.is_none()
335        || content_type
336            .as_ref()
337            .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
338    {
339        // Treat any JSON-y like content types as JSON
340        // Also, because the HTTP spec says we should effectively "guess" the content type if there is no content type (None), we're going to guess it is JSON if the server has not specified one
341        match serde_json::from_slice::<Value>(body) {
342            Ok(json_data) => Ok(json_data),
343            Err(_) => {
344                if let Some(debug_context) = debug_context {
345                    debug_context.lock().push_invalid_response(
346                        debug_request.0.clone(),
347                        parts,
348                        body,
349                        &connector.error_settings,
350                        debug_request.1.clone(),
351                    );
352                }
353                Err(make_err())
354            }
355        }
356    } else if content_type
357        .as_ref()
358        .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
359    {
360        // Plain text we can't parse as JSON so we'll instead return it as a JSON string
361        // Before we can do that, we need to figure out the charset and attempt to decode the string
362        let encoding = content_type
363            .as_ref()
364            .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
365            .unwrap_or(UTF_8);
366        let (decoded_body, _, had_errors) = encoding.decode(body);
367
368        if had_errors {
369            if let Some(debug_context) = debug_context {
370                debug_context.lock().push_invalid_response(
371                    debug_request.0.clone(),
372                    parts,
373                    body,
374                    &connector.error_settings,
375                    debug_request.1.clone(),
376                );
377            }
378            return Err(make_err());
379        }
380
381        Ok(Value::String(decoded_body.into_owned().into()))
382    } else {
383        // For any other content types, all we can do is treat it as a JSON null cause we don't know what it is
384        Ok(Value::Null)
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::sync::Arc;
391
392    use apollo_compiler::Schema;
393    use apollo_compiler::collections::IndexMap;
394    use apollo_compiler::name;
395    use apollo_federation::connectors::ConnectId;
396    use apollo_federation::connectors::ConnectSpec;
397    use apollo_federation::connectors::Connector;
398    use apollo_federation::connectors::EntityResolver;
399    use apollo_federation::connectors::HTTPMethod;
400    use apollo_federation::connectors::HttpJsonTransport;
401    use apollo_federation::connectors::JSONSelection;
402    use apollo_federation::connectors::Namespace;
403    use apollo_federation::connectors::runtime::inputs::RequestInputs;
404    use apollo_federation::connectors::runtime::key::ResponseKey;
405    use insta::assert_debug_snapshot;
406    use itertools::Itertools;
407
408    use crate::Context;
409    use crate::graphql;
410    use crate::plugins::connectors::handle_responses::process_response;
411    use crate::services::router;
412    use crate::services::router::body::RouterBody;
413
414    #[tokio::test]
415    async fn test_handle_responses_root_fields() {
416        let connector = Arc::new(Connector {
417            spec: ConnectSpec::V0_1,
418            id: ConnectId::new(
419                "subgraph_name".into(),
420                None,
421                name!(Query),
422                name!(hello),
423                None,
424                0,
425            ),
426            transport: HttpJsonTransport {
427                source_template: "http://localhost/api".parse().ok(),
428                connect_template: "/path".parse().unwrap(),
429                ..Default::default()
430            },
431            selection: JSONSelection::parse("$.data").unwrap(),
432            entity_resolver: None,
433            config: Default::default(),
434            max_requests: None,
435            batch_settings: None,
436            request_headers: Default::default(),
437            response_headers: Default::default(),
438            request_variable_keys: Default::default(),
439            response_variable_keys: Default::default(),
440            error_settings: Default::default(),
441            label: "test label".into(),
442        });
443
444        let response1: http::Response<RouterBody> = http::Response::builder()
445            .body(router::body::from_bytes(r#"{"data":"world"}"#))
446            .unwrap();
447        let response_key1 = ResponseKey::RootField {
448            name: "hello".to_string(),
449            inputs: Default::default(),
450            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
451        };
452
453        let response2 = http::Response::builder()
454            .body(router::body::from_bytes(r#"{"data":"world"}"#))
455            .unwrap();
456        let response_key2 = ResponseKey::RootField {
457            name: "hello2".to_string(),
458            inputs: Default::default(),
459            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
460        };
461
462        let supergraph_request = Arc::new(
463            http::Request::builder()
464                .body(graphql::Request::builder().build())
465                .unwrap(),
466        );
467
468        let res = super::aggregate_responses(vec![
469            process_response(
470                Ok(response1),
471                response_key1,
472                connector.clone(),
473                &Context::default(),
474                (None, Default::default()),
475                &None,
476                supergraph_request.clone(),
477            )
478            .await
479            .mapped_response,
480            process_response(
481                Ok(response2),
482                response_key2,
483                connector,
484                &Context::default(),
485                (None, Default::default()),
486                &None,
487                supergraph_request,
488            )
489            .await
490            .mapped_response,
491        ])
492        .unwrap();
493
494        assert_debug_snapshot!(res, @r###"
495        Response {
496            response: Response {
497                status: 200,
498                version: HTTP/1.1,
499                headers: {},
500                body: Response {
501                    label: None,
502                    data: Some(
503                        Object({
504                            "hello": String(
505                                "world",
506                            ),
507                            "hello2": String(
508                                "world",
509                            ),
510                        }),
511                    ),
512                    path: None,
513                    errors: [],
514                    extensions: {},
515                    has_next: None,
516                    subscribed: None,
517                    created_at: None,
518                    incremental: [],
519                },
520            },
521        }
522        "###);
523    }
524
525    #[tokio::test]
526    async fn test_handle_responses_entities() {
527        let connector = Arc::new(Connector {
528            spec: ConnectSpec::V0_1,
529            id: ConnectId::new(
530                "subgraph_name".into(),
531                None,
532                name!(Query),
533                name!(user),
534                None,
535                0,
536            ),
537            transport: HttpJsonTransport {
538                source_template: "http://localhost/api".parse().ok(),
539                connect_template: "/path".parse().unwrap(),
540                ..Default::default()
541            },
542            selection: JSONSelection::parse("$.data { id }").unwrap(),
543            entity_resolver: Some(EntityResolver::Explicit),
544            config: Default::default(),
545            max_requests: None,
546            batch_settings: None,
547            request_headers: Default::default(),
548            response_headers: Default::default(),
549            request_variable_keys: Default::default(),
550            response_variable_keys: Default::default(),
551            error_settings: Default::default(),
552            label: "test label".into(),
553        });
554
555        let response1: http::Response<RouterBody> = http::Response::builder()
556            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
557            .unwrap();
558        let response_key1 = ResponseKey::Entity {
559            index: 0,
560            inputs: Default::default(),
561            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
562        };
563
564        let response2 = http::Response::builder()
565            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
566            .unwrap();
567        let response_key2 = ResponseKey::Entity {
568            index: 1,
569            inputs: Default::default(),
570            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
571        };
572
573        let supergraph_request = Arc::new(
574            http::Request::builder()
575                .body(graphql::Request::builder().build())
576                .unwrap(),
577        );
578
579        let res = super::aggregate_responses(vec![
580            process_response(
581                Ok(response1),
582                response_key1,
583                connector.clone(),
584                &Context::default(),
585                (None, Default::default()),
586                &None,
587                supergraph_request.clone(),
588            )
589            .await
590            .mapped_response,
591            process_response(
592                Ok(response2),
593                response_key2,
594                connector,
595                &Context::default(),
596                (None, Default::default()),
597                &None,
598                supergraph_request,
599            )
600            .await
601            .mapped_response,
602        ])
603        .unwrap();
604
605        assert_debug_snapshot!(res, @r###"
606        Response {
607            response: Response {
608                status: 200,
609                version: HTTP/1.1,
610                headers: {},
611                body: Response {
612                    label: None,
613                    data: Some(
614                        Object({
615                            "_entities": Array([
616                                Object({
617                                    "id": String(
618                                        "1",
619                                    ),
620                                }),
621                                Object({
622                                    "id": String(
623                                        "2",
624                                    ),
625                                }),
626                            ]),
627                        }),
628                    ),
629                    path: None,
630                    errors: [],
631                    extensions: {},
632                    has_next: None,
633                    subscribed: None,
634                    created_at: None,
635                    incremental: [],
636                },
637            },
638        }
639        "###);
640    }
641
642    #[tokio::test]
643    async fn test_handle_responses_batch() {
644        let connector = Arc::new(Connector {
645            spec: ConnectSpec::V0_2,
646            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
647            transport: HttpJsonTransport {
648                source_template: "http://localhost/api".parse().ok(),
649                connect_template: "/path".parse().unwrap(),
650                method: HTTPMethod::Post,
651                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
652                ..Default::default()
653            },
654            selection: JSONSelection::parse("$.data { id name }").unwrap(),
655            entity_resolver: Some(EntityResolver::TypeBatch),
656            config: Default::default(),
657            max_requests: None,
658            batch_settings: None,
659            request_headers: Default::default(),
660            response_headers: Default::default(),
661            request_variable_keys: Default::default(),
662            response_variable_keys: Default::default(),
663            error_settings: Default::default(),
664            label: "test label".into(),
665        });
666
667        let keys = connector
668            .resolvable_key(
669                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
670                    .unwrap(),
671            )
672            .unwrap()
673            .unwrap();
674
675        let response1: http::Response<RouterBody> = http::Response::builder()
676            // different order from the request inputs
677            .body(router::body::from_bytes(
678                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
679            ))
680            .unwrap();
681
682        let mut inputs: RequestInputs = RequestInputs::default();
683        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
684        inputs.batch = representations
685            .as_array()
686            .unwrap()
687            .iter()
688            .cloned()
689            .map(|v| v.as_object().unwrap().clone())
690            .collect_vec();
691
692        let response_key1 = ResponseKey::BatchEntity {
693            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
694            keys,
695            inputs,
696        };
697
698        let supergraph_request = Arc::new(
699            http::Request::builder()
700                .body(graphql::Request::builder().build())
701                .unwrap(),
702        );
703
704        let res = super::aggregate_responses(vec![
705            process_response(
706                Ok(response1),
707                response_key1,
708                connector.clone(),
709                &Context::default(),
710                (None, Default::default()),
711                &None,
712                supergraph_request,
713            )
714            .await
715            .mapped_response,
716        ])
717        .unwrap();
718
719        assert_debug_snapshot!(res, @r#"
720        Response {
721            response: Response {
722                status: 200,
723                version: HTTP/1.1,
724                headers: {},
725                body: Response {
726                    label: None,
727                    data: Some(
728                        Object({
729                            "_entities": Array([
730                                Object({
731                                    "id": String(
732                                        "1",
733                                    ),
734                                    "name": String(
735                                        "A",
736                                    ),
737                                }),
738                                Object({
739                                    "id": String(
740                                        "2",
741                                    ),
742                                    "name": String(
743                                        "B",
744                                    ),
745                                }),
746                            ]),
747                        }),
748                    ),
749                    path: None,
750                    errors: [],
751                    extensions: {},
752                    has_next: None,
753                    subscribed: None,
754                    created_at: None,
755                    incremental: [],
756                },
757            },
758        }
759        "#);
760    }
761
762    #[tokio::test]
763    async fn test_handle_responses_entity_field() {
764        let connector = Arc::new(Connector {
765            spec: ConnectSpec::V0_1,
766            id: ConnectId::new(
767                "subgraph_name".into(),
768                None,
769                name!(User),
770                name!(field),
771                None,
772                0,
773            ),
774            transport: HttpJsonTransport {
775                source_template: "http://localhost/api".parse().ok(),
776                connect_template: "/path".parse().unwrap(),
777                ..Default::default()
778            },
779            selection: JSONSelection::parse("$.data").unwrap(),
780            entity_resolver: Some(EntityResolver::Implicit),
781            config: Default::default(),
782            max_requests: None,
783            batch_settings: None,
784            request_headers: Default::default(),
785            response_headers: Default::default(),
786            request_variable_keys: Default::default(),
787            response_variable_keys: Default::default(),
788            error_settings: Default::default(),
789            label: "test label".into(),
790        });
791
792        let response1: http::Response<RouterBody> = http::Response::builder()
793            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
794            .unwrap();
795        let response_key1 = ResponseKey::EntityField {
796            index: 0,
797            inputs: Default::default(),
798            field_name: "field".to_string(),
799            typename: Some(name!("User")),
800            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
801        };
802
803        let response2 = http::Response::builder()
804            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
805            .unwrap();
806        let response_key2 = ResponseKey::EntityField {
807            index: 1,
808            inputs: Default::default(),
809            field_name: "field".to_string(),
810            typename: Some(name!("User")),
811            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
812        };
813
814        let supergraph_request = Arc::new(
815            http::Request::builder()
816                .body(graphql::Request::builder().build())
817                .unwrap(),
818        );
819
820        let res = super::aggregate_responses(vec![
821            process_response(
822                Ok(response1),
823                response_key1,
824                connector.clone(),
825                &Context::default(),
826                (None, Default::default()),
827                &None,
828                supergraph_request.clone(),
829            )
830            .await
831            .mapped_response,
832            process_response(
833                Ok(response2),
834                response_key2,
835                connector,
836                &Context::default(),
837                (None, Default::default()),
838                &None,
839                supergraph_request,
840            )
841            .await
842            .mapped_response,
843        ])
844        .unwrap();
845
846        assert_debug_snapshot!(res, @r###"
847        Response {
848            response: Response {
849                status: 200,
850                version: HTTP/1.1,
851                headers: {},
852                body: Response {
853                    label: None,
854                    data: Some(
855                        Object({
856                            "_entities": Array([
857                                Object({
858                                    "__typename": String(
859                                        "User",
860                                    ),
861                                    "field": String(
862                                        "value1",
863                                    ),
864                                }),
865                                Object({
866                                    "__typename": String(
867                                        "User",
868                                    ),
869                                    "field": String(
870                                        "value2",
871                                    ),
872                                }),
873                            ]),
874                        }),
875                    ),
876                    path: None,
877                    errors: [],
878                    extensions: {},
879                    has_next: None,
880                    subscribed: None,
881                    created_at: None,
882                    incremental: [],
883                },
884            },
885        }
886        "###);
887    }
888
889    #[tokio::test]
890    async fn test_handle_responses_errors() {
891        let connector = Arc::new(Connector {
892            spec: ConnectSpec::V0_1,
893            id: ConnectId::new(
894                "subgraph_name".into(),
895                None,
896                name!(Query),
897                name!(user),
898                None,
899                0,
900            ),
901            transport: HttpJsonTransport {
902                source_template: "http://localhost/api".parse().ok(),
903                connect_template: "/path".parse().unwrap(),
904                ..Default::default()
905            },
906            selection: JSONSelection::parse("$.data").unwrap(),
907            entity_resolver: Some(EntityResolver::Explicit),
908            config: Default::default(),
909            max_requests: None,
910            batch_settings: None,
911            request_headers: Default::default(),
912            response_headers: Default::default(),
913            request_variable_keys: Default::default(),
914            response_variable_keys: Default::default(),
915            error_settings: Default::default(),
916            label: "test label".into(),
917        });
918
919        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
920            .body(router::body::from_bytes(r#"plain text"#))
921            .unwrap();
922        let response_key_plaintext = ResponseKey::Entity {
923            index: 0,
924            inputs: Default::default(),
925            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
926        };
927
928        let response1: http::Response<RouterBody> = http::Response::builder()
929            .status(404)
930            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
931            .unwrap();
932        let response_key1 = ResponseKey::Entity {
933            index: 1,
934            inputs: Default::default(),
935            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
936        };
937
938        let response2 = http::Response::builder()
939            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
940            .unwrap();
941        let response_key2 = ResponseKey::Entity {
942            index: 2,
943            inputs: Default::default(),
944            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
945        };
946
947        let response3: http::Response<RouterBody> = http::Response::builder()
948            .status(500)
949            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
950            .unwrap();
951        let response_key3 = ResponseKey::Entity {
952            index: 3,
953            inputs: Default::default(),
954            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
955        };
956
957        let supergraph_request = Arc::new(
958            http::Request::builder()
959                .body(graphql::Request::builder().build())
960                .unwrap(),
961        );
962
963        let mut res = super::aggregate_responses(vec![
964            process_response(
965                Ok(response_plaintext),
966                response_key_plaintext,
967                connector.clone(),
968                &Context::default(),
969                (None, Default::default()),
970                &None,
971                supergraph_request.clone(),
972            )
973            .await
974            .mapped_response,
975            process_response(
976                Ok(response1),
977                response_key1,
978                connector.clone(),
979                &Context::default(),
980                (None, Default::default()),
981                &None,
982                supergraph_request.clone(),
983            )
984            .await
985            .mapped_response,
986            process_response(
987                Ok(response2),
988                response_key2,
989                connector.clone(),
990                &Context::default(),
991                (None, Default::default()),
992                &None,
993                supergraph_request.clone(),
994            )
995            .await
996            .mapped_response,
997            process_response(
998                Ok(response3),
999                response_key3,
1000                connector,
1001                &Context::default(),
1002                (None, Default::default()),
1003                &None,
1004                supergraph_request,
1005            )
1006            .await
1007            .mapped_response,
1008        ])
1009        .unwrap();
1010
1011        // Overwrite error IDs to avoid random Uuid mismatch.
1012        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
1013        // we have to do it manually.
1014        let body = res.response.body_mut();
1015        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1016
1017        assert_debug_snapshot!(res, @r#"
1018        Response {
1019            response: Response {
1020                status: 200,
1021                version: HTTP/1.1,
1022                headers: {},
1023                body: Response {
1024                    label: None,
1025                    data: Some(
1026                        Object({
1027                            "_entities": Array([
1028                                Null,
1029                                Null,
1030                                Object({
1031                                    "id": String(
1032                                        "2",
1033                                    ),
1034                                }),
1035                                Null,
1036                            ]),
1037                        }),
1038                    ),
1039                    path: None,
1040                    errors: [
1041                        Error {
1042                            message: "The server returned data in an unexpected format.",
1043                            locations: [],
1044                            path: Some(
1045                                Path(
1046                                    [
1047                                        Key(
1048                                            "_entities",
1049                                            None,
1050                                        ),
1051                                        Index(
1052                                            0,
1053                                        ),
1054                                    ],
1055                                ),
1056                            ),
1057                            extensions: {
1058                                "code": String(
1059                                    "CONNECTOR_RESPONSE_INVALID",
1060                                ),
1061                                "service": String(
1062                                    "subgraph_name",
1063                                ),
1064                                "connector": Object({
1065                                    "coordinate": String(
1066                                        "subgraph_name:Query.user@connect[0]",
1067                                    ),
1068                                }),
1069                                "http": Object({
1070                                    "status": Number(200),
1071                                }),
1072                                "apollo.private.subgraph.name": String(
1073                                    "subgraph_name",
1074                                ),
1075                            },
1076                            apollo_id: 00000000-0000-0000-0000-000000000000,
1077                        },
1078                        Error {
1079                            message: "Request failed",
1080                            locations: [],
1081                            path: Some(
1082                                Path(
1083                                    [
1084                                        Key(
1085                                            "_entities",
1086                                            None,
1087                                        ),
1088                                        Index(
1089                                            1,
1090                                        ),
1091                                    ],
1092                                ),
1093                            ),
1094                            extensions: {
1095                                "code": String(
1096                                    "CONNECTOR_FETCH",
1097                                ),
1098                                "service": String(
1099                                    "subgraph_name",
1100                                ),
1101                                "connector": Object({
1102                                    "coordinate": String(
1103                                        "subgraph_name:Query.user@connect[0]",
1104                                    ),
1105                                }),
1106                                "http": Object({
1107                                    "status": Number(404),
1108                                }),
1109                                "apollo.private.subgraph.name": String(
1110                                    "subgraph_name",
1111                                ),
1112                            },
1113                            apollo_id: 00000000-0000-0000-0000-000000000000,
1114                        },
1115                        Error {
1116                            message: "Request failed",
1117                            locations: [],
1118                            path: Some(
1119                                Path(
1120                                    [
1121                                        Key(
1122                                            "_entities",
1123                                            None,
1124                                        ),
1125                                        Index(
1126                                            3,
1127                                        ),
1128                                    ],
1129                                ),
1130                            ),
1131                            extensions: {
1132                                "code": String(
1133                                    "CONNECTOR_FETCH",
1134                                ),
1135                                "service": String(
1136                                    "subgraph_name",
1137                                ),
1138                                "connector": Object({
1139                                    "coordinate": String(
1140                                        "subgraph_name:Query.user@connect[0]",
1141                                    ),
1142                                }),
1143                                "http": Object({
1144                                    "status": Number(500),
1145                                }),
1146                                "apollo.private.subgraph.name": String(
1147                                    "subgraph_name",
1148                                ),
1149                            },
1150                            apollo_id: 00000000-0000-0000-0000-000000000000,
1151                        },
1152                    ],
1153                    extensions: {},
1154                    has_next: None,
1155                    subscribed: None,
1156                    created_at: None,
1157                    incremental: [],
1158                },
1159            },
1160        }
1161        "#);
1162    }
1163
1164    #[tokio::test]
1165    async fn test_handle_responses_status() {
1166        let selection = JSONSelection::parse("$status").unwrap();
1167        let connector = Arc::new(Connector {
1168            spec: ConnectSpec::V0_1,
1169            id: ConnectId::new(
1170                "subgraph_name".into(),
1171                None,
1172                name!(Query),
1173                name!(hello),
1174                None,
1175                0,
1176            ),
1177            transport: HttpJsonTransport {
1178                source_template: "http://localhost/api".parse().ok(),
1179                connect_template: "/path".parse().unwrap(),
1180                ..Default::default()
1181            },
1182            selection: selection.clone(),
1183            entity_resolver: None,
1184            config: Default::default(),
1185            max_requests: None,
1186            batch_settings: None,
1187            request_headers: Default::default(),
1188            response_headers: Default::default(),
1189            request_variable_keys: Default::default(),
1190            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1191            error_settings: Default::default(),
1192            label: "test label".into(),
1193        });
1194
1195        let response1: http::Response<RouterBody> = http::Response::builder()
1196            .status(201)
1197            .body(router::body::from_bytes(r#"{}"#))
1198            .unwrap();
1199        let response_key1 = ResponseKey::RootField {
1200            name: "hello".to_string(),
1201            inputs: Default::default(),
1202            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1203        };
1204
1205        let supergraph_request = Arc::new(
1206            http::Request::builder()
1207                .body(graphql::Request::builder().build())
1208                .unwrap(),
1209        );
1210
1211        let res = super::aggregate_responses(vec![
1212            process_response(
1213                Ok(response1),
1214                response_key1,
1215                connector,
1216                &Context::default(),
1217                (None, Default::default()),
1218                &None,
1219                supergraph_request,
1220            )
1221            .await
1222            .mapped_response,
1223        ])
1224        .unwrap();
1225
1226        assert_debug_snapshot!(res, @r###"
1227        Response {
1228            response: Response {
1229                status: 200,
1230                version: HTTP/1.1,
1231                headers: {},
1232                body: Response {
1233                    label: None,
1234                    data: Some(
1235                        Object({
1236                            "hello": Number(201),
1237                        }),
1238                    ),
1239                    path: None,
1240                    errors: [],
1241                    extensions: {},
1242                    has_next: None,
1243                    subscribed: None,
1244                    created_at: None,
1245                    incremental: [],
1246                },
1247            },
1248        }
1249        "###);
1250    }
1251}