apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_federation::connectors::Connector;
4use apollo_federation::connectors::runtime::debug::ConnectorContext;
5use apollo_federation::connectors::runtime::debug::DebugRequest;
6use apollo_federation::connectors::runtime::debug::SelectionData;
7use apollo_federation::connectors::runtime::errors::Error;
8use apollo_federation::connectors::runtime::errors::RuntimeError;
9use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
10use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
11use apollo_federation::connectors::runtime::key::ResponseKey;
12use apollo_federation::connectors::runtime::mapping::Problem;
13use apollo_federation::connectors::runtime::responses::HandleResponseError;
14use apollo_federation::connectors::runtime::responses::MappedResponse;
15use apollo_federation::connectors::runtime::responses::deserialize_response;
16use apollo_federation::connectors::runtime::responses::handle_raw_response;
17use axum::body::HttpBody;
18use http::response::Parts;
19use http_body_util::BodyExt;
20use opentelemetry::KeyValue;
21use parking_lot::Mutex;
22use serde_json_bytes::Map;
23use serde_json_bytes::Value;
24use tracing::Span;
25
26use crate::Context;
27use crate::graphql;
28use crate::json_ext::Path;
29use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
30use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
33use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
34use crate::plugins::telemetry::config_new::events::log_event;
35use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
36use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
38use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
39use crate::services::connect::Response;
40use crate::services::connector;
41use crate::services::fetch::AddSubgraphNameExt;
42
43// --- ERRORS ------------------------------------------------------------------
44
45impl From<RuntimeError> for graphql::Error {
46    fn from(error: RuntimeError) -> Self {
47        let path: Path = (&error.path).into();
48
49        let err = graphql::Error::builder()
50            .message(&error.message)
51            .extensions(error.extensions())
52            .extension_code(error.code())
53            .path(path)
54            .build();
55
56        if let Some(subgraph_name) = &error.subgraph_name {
57            err.with_subgraph_name(subgraph_name)
58        } else {
59            err
60        }
61    }
62}
63
64// --- handle_responses --------------------------------------------------------
65
66pub(crate) async fn process_response<T: HttpBody>(
67    result: Result<http::Response<T>, Error>,
68    response_key: ResponseKey,
69    connector: Arc<Connector>,
70    context: &Context,
71    debug_request: DebugRequest,
72    debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
73    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
74) -> connector::request_service::Response {
75    let (mapped_response, result) = match result {
76        // This occurs when we short-circuit the request when over the limit
77        Err(error) => {
78            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
79            (
80                MappedResponse::Error {
81                    error: error.to_runtime_error(&connector, &response_key),
82                    key: response_key,
83                    problems: Vec::new(),
84                },
85                Err(error),
86            )
87        }
88        Ok(response) => {
89            let (parts, body) = response.into_parts();
90
91            let result = Ok(TransportResponse::Http(HttpResponse {
92                inner: parts.clone(),
93            }));
94
95            let make_err = || {
96                let mut err = RuntimeError::new(
97                    "The server returned data in an unexpected format.".to_string(),
98                    &response_key,
99                );
100                err.subgraph_name = Some(connector.id.subgraph_name.clone());
101                err = err.with_code("CONNECTOR_RESPONSE_INVALID");
102                err.coordinate = Some(connector.id.coordinate());
103                err = err.extension(
104                    "http",
105                    Value::Object(Map::from_iter([(
106                        "status".into(),
107                        Value::Number(parts.status.as_u16().into()),
108                    )])),
109                );
110                err
111            };
112
113            let deserialized_body = body
114                .collect()
115                .await
116                .map_err(|_| ())
117                .and_then(|body| {
118                    let body = body.to_bytes();
119                    let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
120                        if let Some(debug_context) = debug_context {
121                            debug_context.lock().push_invalid_response(
122                                debug_request.0.clone(),
123                                &parts,
124                                &body,
125                                &connector.error_settings,
126                                debug_request.1.clone(),
127                            );
128                        }
129                    });
130                    log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
131                    raw
132                })
133                .map_err(|()| make_err());
134
135            // If this errors, it will write to the debug context because it
136            // has access to the raw bytes, so we can't write to it again
137            // in any RawResponse::Error branches.
138            let mapped = match &deserialized_body {
139                Err(error) => MappedResponse::Error {
140                    error: error.clone(),
141                    key: response_key,
142                    problems: Vec::new(),
143                },
144                Ok(data) => handle_raw_response(
145                    data,
146                    &parts,
147                    response_key,
148                    &connector,
149                    context,
150                    supergraph_request.headers(),
151                ),
152            };
153
154            if let Some(debug) = debug_context {
155                let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
156                debug_problems.extend(debug_request.1);
157
158                let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
159                    Some(SelectionData {
160                        source: connector.selection.to_string(),
161                        transformed: key.selection().to_string(),
162                        result: Some(data.clone()),
163                    })
164                } else {
165                    None
166                };
167
168                debug.lock().push_response(
169                    debug_request.0,
170                    &parts,
171                    deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
172                    selection_data,
173                    &connector.error_settings,
174                    debug_problems,
175                );
176            }
177            if matches!(mapped, MappedResponse::Data { .. }) {
178                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
179            } else {
180                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
181            }
182
183            (mapped, result)
184        }
185    };
186
187    if let MappedResponse::Error { ref error, .. } = mapped_response {
188        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
189    }
190
191    connector::request_service::Response {
192        transport_result: result,
193        mapped_response,
194    }
195}
196
197pub(crate) fn aggregate_responses(
198    responses: Vec<MappedResponse>,
199) -> Result<Response, HandleResponseError> {
200    let mut data = serde_json_bytes::Map::new();
201    let mut errors = Vec::new();
202    let count = responses.len();
203
204    for mapped in responses {
205        mapped.add_to_data(&mut data, &mut errors, count)?;
206    }
207
208    let data = if data.is_empty() {
209        Value::Null
210    } else {
211        Value::Object(data)
212    };
213
214    Span::current().record(
215        OTEL_STATUS_CODE,
216        if errors.is_empty() {
217            OTEL_STATUS_CODE_OK
218        } else {
219            OTEL_STATUS_CODE_ERROR
220        },
221    );
222
223    Ok(Response {
224        response: http::Response::builder()
225            .body(
226                graphql::Response::builder()
227                    .data(data)
228                    .errors(errors.into_iter().map(|e| e.into()).collect())
229                    .build(),
230            )
231            .unwrap(),
232    })
233}
234
235fn log_connectors_event(
236    context: &Context,
237    body: &[u8],
238    parts: &Parts,
239    response_key: ResponseKey,
240    connector: &Connector,
241) {
242    let log_response_level = context
243        .extensions()
244        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
245        .and_then(|event| {
246            // TODO: evaluate if this is still needed now that we're cloning the body anyway
247            // Create a temporary response here so we can evaluate the condition. This response
248            // is missing any information about the mapped response, because we don't have that
249            // yet. This means that we cannot correctly evaluate any condition that relies on
250            // the mapped response data or mapping problems. But we can't wait until we do have
251            // that information, because this is the only place we have the body bytes (without
252            // making an expensive clone of the body). So we either need to not expose any
253            // selector which can be used as a condition that requires mapping information, or
254            // we must document that such selectors cannot be used as conditions on standard
255            // connectors events.
256
257            let response = connector::request_service::Response {
258                transport_result: Ok(TransportResponse::Http(HttpResponse {
259                    inner: parts.clone(),
260                })),
261                mapped_response: MappedResponse::Data {
262                    data: Value::Null,
263                    key: response_key,
264                    problems: vec![],
265                },
266            };
267            if event.condition.evaluate_response(&response) {
268                Some(event.level)
269            } else {
270                None
271            }
272        });
273
274    if let Some(level) = log_response_level {
275        let mut attrs = Vec::with_capacity(4);
276        #[cfg(test)]
277        let headers = {
278            let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
279                .headers
280                .iter()
281                .map(|(name, val)| (name.to_string(), val.clone()))
282                .collect();
283            headers.sort_keys();
284            headers
285        };
286        #[cfg(not(test))]
287        let headers = &parts.headers;
288
289        attrs.push(KeyValue::new(
290            HTTP_RESPONSE_HEADERS,
291            opentelemetry::Value::String(format!("{headers:?}").into()),
292        ));
293        attrs.push(KeyValue::new(
294            HTTP_RESPONSE_STATUS,
295            opentelemetry::Value::String(format!("{}", parts.status).into()),
296        ));
297        attrs.push(KeyValue::new(
298            HTTP_RESPONSE_VERSION,
299            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
300        ));
301        attrs.push(KeyValue::new(
302            HTTP_RESPONSE_BODY,
303            opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
304        ));
305
306        log_event(
307            level,
308            "connector.response",
309            attrs,
310            &format!("Response from connector {label:?}", label = connector.label),
311        );
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use std::sync::Arc;
318
319    use apollo_compiler::Schema;
320    use apollo_compiler::collections::IndexMap;
321    use apollo_compiler::name;
322    use apollo_compiler::response::JsonValue;
323    use apollo_federation::connectors::ConnectId;
324    use apollo_federation::connectors::ConnectSpec;
325    use apollo_federation::connectors::Connector;
326    use apollo_federation::connectors::ConnectorErrorsSettings;
327    use apollo_federation::connectors::EntityResolver;
328    use apollo_federation::connectors::HTTPMethod;
329    use apollo_federation::connectors::HttpJsonTransport;
330    use apollo_federation::connectors::JSONSelection;
331    use apollo_federation::connectors::Label;
332    use apollo_federation::connectors::Namespace;
333    use apollo_federation::connectors::runtime::inputs::RequestInputs;
334    use apollo_federation::connectors::runtime::key::ResponseKey;
335    use insta::assert_debug_snapshot;
336    use itertools::Itertools;
337    use serde_json_bytes::json;
338
339    use crate::Context;
340    use crate::graphql;
341    use crate::plugins::connectors::handle_responses::process_response;
342    use crate::services::router;
343    use crate::services::router::body::RouterBody;
344
345    #[tokio::test]
346    async fn test_handle_responses_root_fields() {
347        let connector = Arc::new(Connector {
348            spec: ConnectSpec::V0_1,
349            id: ConnectId::new(
350                "subgraph_name".into(),
351                None,
352                name!(Query),
353                name!(hello),
354                None,
355                0,
356            ),
357            transport: HttpJsonTransport {
358                source_template: "http://localhost/api".parse().ok(),
359                connect_template: "/path".parse().unwrap(),
360                ..Default::default()
361            },
362            selection: JSONSelection::parse("$.data").unwrap(),
363            entity_resolver: None,
364            config: Default::default(),
365            max_requests: None,
366            batch_settings: None,
367            request_headers: Default::default(),
368            response_headers: Default::default(),
369            request_variable_keys: Default::default(),
370            response_variable_keys: Default::default(),
371            error_settings: Default::default(),
372            label: "test label".into(),
373        });
374
375        let response1: http::Response<RouterBody> = http::Response::builder()
376            .body(router::body::from_bytes(r#"{"data":"world"}"#))
377            .unwrap();
378        let response_key1 = ResponseKey::RootField {
379            name: "hello".to_string(),
380            inputs: Default::default(),
381            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
382        };
383
384        let response2 = http::Response::builder()
385            .body(router::body::from_bytes(r#"{"data":"world"}"#))
386            .unwrap();
387        let response_key2 = ResponseKey::RootField {
388            name: "hello2".to_string(),
389            inputs: Default::default(),
390            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
391        };
392
393        let supergraph_request = Arc::new(
394            http::Request::builder()
395                .body(graphql::Request::builder().build())
396                .unwrap(),
397        );
398
399        let res = super::aggregate_responses(vec![
400            process_response(
401                Ok(response1),
402                response_key1,
403                connector.clone(),
404                &Context::default(),
405                (None, Default::default()),
406                None,
407                supergraph_request.clone(),
408            )
409            .await
410            .mapped_response,
411            process_response(
412                Ok(response2),
413                response_key2,
414                connector,
415                &Context::default(),
416                (None, Default::default()),
417                None,
418                supergraph_request,
419            )
420            .await
421            .mapped_response,
422        ])
423        .unwrap();
424
425        assert_debug_snapshot!(res, @r###"
426        Response {
427            response: Response {
428                status: 200,
429                version: HTTP/1.1,
430                headers: {},
431                body: Response {
432                    label: None,
433                    data: Some(
434                        Object({
435                            "hello": String(
436                                "world",
437                            ),
438                            "hello2": String(
439                                "world",
440                            ),
441                        }),
442                    ),
443                    path: None,
444                    errors: [],
445                    extensions: {},
446                    has_next: None,
447                    subscribed: None,
448                    created_at: None,
449                    incremental: [],
450                },
451            },
452        }
453        "###);
454    }
455
456    #[tokio::test]
457    async fn test_handle_responses_entities() {
458        let connector = Arc::new(Connector {
459            spec: ConnectSpec::V0_1,
460            id: ConnectId::new(
461                "subgraph_name".into(),
462                None,
463                name!(Query),
464                name!(user),
465                None,
466                0,
467            ),
468            transport: HttpJsonTransport {
469                source_template: "http://localhost/api".parse().ok(),
470                connect_template: "/path".parse().unwrap(),
471                ..Default::default()
472            },
473            selection: JSONSelection::parse("$.data { id }").unwrap(),
474            entity_resolver: Some(EntityResolver::Explicit),
475            config: Default::default(),
476            max_requests: None,
477            batch_settings: None,
478            request_headers: Default::default(),
479            response_headers: Default::default(),
480            request_variable_keys: Default::default(),
481            response_variable_keys: Default::default(),
482            error_settings: Default::default(),
483            label: "test label".into(),
484        });
485
486        let response1: http::Response<RouterBody> = http::Response::builder()
487            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
488            .unwrap();
489        let response_key1 = ResponseKey::Entity {
490            index: 0,
491            inputs: Default::default(),
492            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
493        };
494
495        let response2 = http::Response::builder()
496            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
497            .unwrap();
498        let response_key2 = ResponseKey::Entity {
499            index: 1,
500            inputs: Default::default(),
501            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
502        };
503
504        let supergraph_request = Arc::new(
505            http::Request::builder()
506                .body(graphql::Request::builder().build())
507                .unwrap(),
508        );
509
510        let res = super::aggregate_responses(vec![
511            process_response(
512                Ok(response1),
513                response_key1,
514                connector.clone(),
515                &Context::default(),
516                (None, Default::default()),
517                None,
518                supergraph_request.clone(),
519            )
520            .await
521            .mapped_response,
522            process_response(
523                Ok(response2),
524                response_key2,
525                connector,
526                &Context::default(),
527                (None, Default::default()),
528                None,
529                supergraph_request,
530            )
531            .await
532            .mapped_response,
533        ])
534        .unwrap();
535
536        assert_debug_snapshot!(res, @r###"
537        Response {
538            response: Response {
539                status: 200,
540                version: HTTP/1.1,
541                headers: {},
542                body: Response {
543                    label: None,
544                    data: Some(
545                        Object({
546                            "_entities": Array([
547                                Object({
548                                    "id": String(
549                                        "1",
550                                    ),
551                                }),
552                                Object({
553                                    "id": String(
554                                        "2",
555                                    ),
556                                }),
557                            ]),
558                        }),
559                    ),
560                    path: None,
561                    errors: [],
562                    extensions: {},
563                    has_next: None,
564                    subscribed: None,
565                    created_at: None,
566                    incremental: [],
567                },
568            },
569        }
570        "###);
571    }
572
573    #[tokio::test]
574    async fn test_handle_responses_batch() {
575        let connector = Arc::new(Connector {
576            spec: ConnectSpec::V0_2,
577            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
578            transport: HttpJsonTransport {
579                source_template: "http://localhost/api".parse().ok(),
580                connect_template: "/path".parse().unwrap(),
581                method: HTTPMethod::Post,
582                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
583                ..Default::default()
584            },
585            selection: JSONSelection::parse("$.data { id name }").unwrap(),
586            entity_resolver: Some(EntityResolver::TypeBatch),
587            config: Default::default(),
588            max_requests: None,
589            batch_settings: None,
590            request_headers: Default::default(),
591            response_headers: Default::default(),
592            request_variable_keys: Default::default(),
593            response_variable_keys: Default::default(),
594            error_settings: Default::default(),
595            label: "test label".into(),
596        });
597
598        let keys = connector
599            .resolvable_key(
600                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
601                    .unwrap(),
602            )
603            .unwrap()
604            .unwrap();
605
606        let response1: http::Response<RouterBody> = http::Response::builder()
607            // different order from the request inputs
608            .body(router::body::from_bytes(
609                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
610            ))
611            .unwrap();
612
613        let mut inputs: RequestInputs = RequestInputs::default();
614        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
615        inputs.batch = representations
616            .as_array()
617            .unwrap()
618            .iter()
619            .cloned()
620            .map(|v| v.as_object().unwrap().clone())
621            .collect_vec();
622
623        let response_key1 = ResponseKey::BatchEntity {
624            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
625            keys,
626            inputs,
627        };
628
629        let supergraph_request = Arc::new(
630            http::Request::builder()
631                .body(graphql::Request::builder().build())
632                .unwrap(),
633        );
634
635        let res = super::aggregate_responses(vec![
636            process_response(
637                Ok(response1),
638                response_key1,
639                connector.clone(),
640                &Context::default(),
641                (None, Default::default()),
642                None,
643                supergraph_request,
644            )
645            .await
646            .mapped_response,
647        ])
648        .unwrap();
649
650        assert_debug_snapshot!(res, @r#"
651        Response {
652            response: Response {
653                status: 200,
654                version: HTTP/1.1,
655                headers: {},
656                body: Response {
657                    label: None,
658                    data: Some(
659                        Object({
660                            "_entities": Array([
661                                Object({
662                                    "id": String(
663                                        "1",
664                                    ),
665                                    "name": String(
666                                        "A",
667                                    ),
668                                }),
669                                Object({
670                                    "id": String(
671                                        "2",
672                                    ),
673                                    "name": String(
674                                        "B",
675                                    ),
676                                }),
677                            ]),
678                        }),
679                    ),
680                    path: None,
681                    errors: [],
682                    extensions: {},
683                    has_next: None,
684                    subscribed: None,
685                    created_at: None,
686                    incremental: [],
687                },
688            },
689        }
690        "#);
691    }
692
693    #[tokio::test]
694    async fn test_handle_responses_entity_field() {
695        let connector = Arc::new(Connector {
696            spec: ConnectSpec::V0_1,
697            id: ConnectId::new(
698                "subgraph_name".into(),
699                None,
700                name!(User),
701                name!(field),
702                None,
703                0,
704            ),
705            transport: HttpJsonTransport {
706                source_template: "http://localhost/api".parse().ok(),
707                connect_template: "/path".parse().unwrap(),
708                ..Default::default()
709            },
710            selection: JSONSelection::parse("$.data").unwrap(),
711            entity_resolver: Some(EntityResolver::Implicit),
712            config: Default::default(),
713            max_requests: None,
714            batch_settings: None,
715            request_headers: Default::default(),
716            response_headers: Default::default(),
717            request_variable_keys: Default::default(),
718            response_variable_keys: Default::default(),
719            error_settings: Default::default(),
720            label: "test label".into(),
721        });
722
723        let response1: http::Response<RouterBody> = http::Response::builder()
724            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
725            .unwrap();
726        let response_key1 = ResponseKey::EntityField {
727            index: 0,
728            inputs: Default::default(),
729            field_name: "field".to_string(),
730            typename: Some(name!("User")),
731            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
732        };
733
734        let response2 = http::Response::builder()
735            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
736            .unwrap();
737        let response_key2 = ResponseKey::EntityField {
738            index: 1,
739            inputs: Default::default(),
740            field_name: "field".to_string(),
741            typename: Some(name!("User")),
742            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
743        };
744
745        let supergraph_request = Arc::new(
746            http::Request::builder()
747                .body(graphql::Request::builder().build())
748                .unwrap(),
749        );
750
751        let res = super::aggregate_responses(vec![
752            process_response(
753                Ok(response1),
754                response_key1,
755                connector.clone(),
756                &Context::default(),
757                (None, Default::default()),
758                None,
759                supergraph_request.clone(),
760            )
761            .await
762            .mapped_response,
763            process_response(
764                Ok(response2),
765                response_key2,
766                connector,
767                &Context::default(),
768                (None, Default::default()),
769                None,
770                supergraph_request,
771            )
772            .await
773            .mapped_response,
774        ])
775        .unwrap();
776
777        assert_debug_snapshot!(res, @r###"
778        Response {
779            response: Response {
780                status: 200,
781                version: HTTP/1.1,
782                headers: {},
783                body: Response {
784                    label: None,
785                    data: Some(
786                        Object({
787                            "_entities": Array([
788                                Object({
789                                    "__typename": String(
790                                        "User",
791                                    ),
792                                    "field": String(
793                                        "value1",
794                                    ),
795                                }),
796                                Object({
797                                    "__typename": String(
798                                        "User",
799                                    ),
800                                    "field": String(
801                                        "value2",
802                                    ),
803                                }),
804                            ]),
805                        }),
806                    ),
807                    path: None,
808                    errors: [],
809                    extensions: {},
810                    has_next: None,
811                    subscribed: None,
812                    created_at: None,
813                    incremental: [],
814                },
815            },
816        }
817        "###);
818    }
819
820    #[tokio::test]
821    async fn test_handle_responses_errors() {
822        let connector = Arc::new(Connector {
823            spec: ConnectSpec::V0_1,
824            id: ConnectId::new(
825                "subgraph_name".into(),
826                None,
827                name!(Query),
828                name!(user),
829                None,
830                0,
831            ),
832            transport: HttpJsonTransport {
833                source_template: "http://localhost/api".parse().ok(),
834                connect_template: "/path".parse().unwrap(),
835                ..Default::default()
836            },
837            selection: JSONSelection::parse("$.data").unwrap(),
838            entity_resolver: Some(EntityResolver::Explicit),
839            config: Default::default(),
840            max_requests: None,
841            batch_settings: None,
842            request_headers: Default::default(),
843            response_headers: Default::default(),
844            request_variable_keys: Default::default(),
845            response_variable_keys: Default::default(),
846            error_settings: Default::default(),
847            label: "test label".into(),
848        });
849
850        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
851            .body(router::body::from_bytes(r#"plain text"#))
852            .unwrap();
853        let response_key_plaintext = ResponseKey::Entity {
854            index: 0,
855            inputs: Default::default(),
856            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
857        };
858
859        let response1: http::Response<RouterBody> = http::Response::builder()
860            .status(404)
861            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
862            .unwrap();
863        let response_key1 = ResponseKey::Entity {
864            index: 1,
865            inputs: Default::default(),
866            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
867        };
868
869        let response2 = http::Response::builder()
870            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
871            .unwrap();
872        let response_key2 = ResponseKey::Entity {
873            index: 2,
874            inputs: Default::default(),
875            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
876        };
877
878        let response3: http::Response<RouterBody> = http::Response::builder()
879            .status(500)
880            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
881            .unwrap();
882        let response_key3 = ResponseKey::Entity {
883            index: 3,
884            inputs: Default::default(),
885            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
886        };
887
888        let supergraph_request = Arc::new(
889            http::Request::builder()
890                .body(graphql::Request::builder().build())
891                .unwrap(),
892        );
893
894        let mut res = super::aggregate_responses(vec![
895            process_response(
896                Ok(response_plaintext),
897                response_key_plaintext,
898                connector.clone(),
899                &Context::default(),
900                (None, Default::default()),
901                None,
902                supergraph_request.clone(),
903            )
904            .await
905            .mapped_response,
906            process_response(
907                Ok(response1),
908                response_key1,
909                connector.clone(),
910                &Context::default(),
911                (None, Default::default()),
912                None,
913                supergraph_request.clone(),
914            )
915            .await
916            .mapped_response,
917            process_response(
918                Ok(response2),
919                response_key2,
920                connector.clone(),
921                &Context::default(),
922                (None, Default::default()),
923                None,
924                supergraph_request.clone(),
925            )
926            .await
927            .mapped_response,
928            process_response(
929                Ok(response3),
930                response_key3,
931                connector,
932                &Context::default(),
933                (None, Default::default()),
934                None,
935                supergraph_request,
936            )
937            .await
938            .mapped_response,
939        ])
940        .unwrap();
941
942        // Overwrite error IDs to avoid random Uuid mismatch.
943        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
944        // we have to do it manually.
945        let body = res.response.body_mut();
946        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
947
948        assert_debug_snapshot!(res, @r#"
949        Response {
950            response: Response {
951                status: 200,
952                version: HTTP/1.1,
953                headers: {},
954                body: Response {
955                    label: None,
956                    data: Some(
957                        Object({
958                            "_entities": Array([
959                                Null,
960                                Null,
961                                Object({
962                                    "id": String(
963                                        "2",
964                                    ),
965                                }),
966                                Null,
967                            ]),
968                        }),
969                    ),
970                    path: None,
971                    errors: [
972                        Error {
973                            message: "The server returned data in an unexpected format.",
974                            locations: [],
975                            path: Some(
976                                Path(
977                                    [
978                                        Key(
979                                            "_entities",
980                                            None,
981                                        ),
982                                        Index(
983                                            0,
984                                        ),
985                                    ],
986                                ),
987                            ),
988                            extensions: {
989                                "code": String(
990                                    "CONNECTOR_RESPONSE_INVALID",
991                                ),
992                                "service": String(
993                                    "subgraph_name",
994                                ),
995                                "connector": Object({
996                                    "coordinate": String(
997                                        "subgraph_name:Query.user[0]",
998                                    ),
999                                }),
1000                                "http": Object({
1001                                    "status": Number(200),
1002                                }),
1003                                "apollo.private.subgraph.name": String(
1004                                    "subgraph_name",
1005                                ),
1006                            },
1007                            apollo_id: 00000000-0000-0000-0000-000000000000,
1008                        },
1009                        Error {
1010                            message: "Request failed",
1011                            locations: [],
1012                            path: Some(
1013                                Path(
1014                                    [
1015                                        Key(
1016                                            "_entities",
1017                                            None,
1018                                        ),
1019                                        Index(
1020                                            1,
1021                                        ),
1022                                    ],
1023                                ),
1024                            ),
1025                            extensions: {
1026                                "code": String(
1027                                    "CONNECTOR_FETCH",
1028                                ),
1029                                "service": String(
1030                                    "subgraph_name",
1031                                ),
1032                                "connector": Object({
1033                                    "coordinate": String(
1034                                        "subgraph_name:Query.user[0]",
1035                                    ),
1036                                }),
1037                                "http": Object({
1038                                    "status": Number(404),
1039                                }),
1040                                "apollo.private.subgraph.name": String(
1041                                    "subgraph_name",
1042                                ),
1043                            },
1044                            apollo_id: 00000000-0000-0000-0000-000000000000,
1045                        },
1046                        Error {
1047                            message: "Request failed",
1048                            locations: [],
1049                            path: Some(
1050                                Path(
1051                                    [
1052                                        Key(
1053                                            "_entities",
1054                                            None,
1055                                        ),
1056                                        Index(
1057                                            3,
1058                                        ),
1059                                    ],
1060                                ),
1061                            ),
1062                            extensions: {
1063                                "code": String(
1064                                    "CONNECTOR_FETCH",
1065                                ),
1066                                "service": String(
1067                                    "subgraph_name",
1068                                ),
1069                                "connector": Object({
1070                                    "coordinate": String(
1071                                        "subgraph_name:Query.user[0]",
1072                                    ),
1073                                }),
1074                                "http": Object({
1075                                    "status": Number(500),
1076                                }),
1077                                "apollo.private.subgraph.name": String(
1078                                    "subgraph_name",
1079                                ),
1080                            },
1081                            apollo_id: 00000000-0000-0000-0000-000000000000,
1082                        },
1083                    ],
1084                    extensions: {},
1085                    has_next: None,
1086                    subscribed: None,
1087                    created_at: None,
1088                    incremental: [],
1089                },
1090            },
1091        }
1092        "#);
1093    }
1094
1095    #[tokio::test]
1096    async fn test_handle_responses_status() {
1097        let selection = JSONSelection::parse("$status").unwrap();
1098        let connector = Arc::new(Connector {
1099            spec: ConnectSpec::V0_1,
1100            id: ConnectId::new(
1101                "subgraph_name".into(),
1102                None,
1103                name!(Query),
1104                name!(hello),
1105                None,
1106                0,
1107            ),
1108            transport: HttpJsonTransport {
1109                source_template: "http://localhost/api".parse().ok(),
1110                connect_template: "/path".parse().unwrap(),
1111                ..Default::default()
1112            },
1113            selection: selection.clone(),
1114            entity_resolver: None,
1115            config: Default::default(),
1116            max_requests: None,
1117            batch_settings: None,
1118            request_headers: Default::default(),
1119            response_headers: Default::default(),
1120            request_variable_keys: Default::default(),
1121            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1122            error_settings: Default::default(),
1123            label: "test label".into(),
1124        });
1125
1126        let response1: http::Response<RouterBody> = http::Response::builder()
1127            .status(201)
1128            .body(router::body::from_bytes(r#"{}"#))
1129            .unwrap();
1130        let response_key1 = ResponseKey::RootField {
1131            name: "hello".to_string(),
1132            inputs: Default::default(),
1133            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1134        };
1135
1136        let supergraph_request = Arc::new(
1137            http::Request::builder()
1138                .body(graphql::Request::builder().build())
1139                .unwrap(),
1140        );
1141
1142        let res = super::aggregate_responses(vec![
1143            process_response(
1144                Ok(response1),
1145                response_key1,
1146                connector,
1147                &Context::default(),
1148                (None, Default::default()),
1149                None,
1150                supergraph_request,
1151            )
1152            .await
1153            .mapped_response,
1154        ])
1155        .unwrap();
1156
1157        assert_debug_snapshot!(res, @r###"
1158        Response {
1159            response: Response {
1160                status: 200,
1161                version: HTTP/1.1,
1162                headers: {},
1163                body: Response {
1164                    label: None,
1165                    data: Some(
1166                        Object({
1167                            "hello": Number(201),
1168                        }),
1169                    ),
1170                    path: None,
1171                    errors: [],
1172                    extensions: {},
1173                    has_next: None,
1174                    subscribed: None,
1175                    created_at: None,
1176                    incremental: [],
1177                },
1178            },
1179        }
1180        "###);
1181    }
1182
1183    #[tokio::test]
1184    async fn test_handle_response_with_is_success() {
1185        let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1186        let selection = JSONSelection::parse("$status").unwrap();
1187        let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1188            message: Default::default(),
1189            source_extensions: Default::default(),
1190            connect_extensions: Default::default(),
1191            connect_is_success: Some(is_success.clone()),
1192        };
1193        let connector = Arc::new(Connector {
1194            spec: ConnectSpec::V0_1,
1195            id: ConnectId::new(
1196                "subgraph_name".into(),
1197                None,
1198                name!(Query),
1199                name!(hello),
1200                None,
1201                0,
1202            ),
1203            transport: HttpJsonTransport {
1204                source_template: "http://localhost/api".parse().ok(),
1205                connect_template: "/path".parse().unwrap(),
1206                ..Default::default()
1207            },
1208            selection: selection.clone(),
1209            entity_resolver: None,
1210            config: Default::default(),
1211            max_requests: None,
1212            batch_settings: None,
1213            request_headers: Default::default(),
1214            response_headers: Default::default(),
1215            request_variable_keys: Default::default(),
1216            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1217            error_settings,
1218            label: Label::from("test label"),
1219        });
1220
1221        // First request should be marked as error as status is NOT 400
1222        let response_fail: http::Response<RouterBody> = http::Response::builder()
1223            .status(201)
1224            .body(router::body::from_bytes(r#"{}"#))
1225            .unwrap();
1226        let response_fail_key = ResponseKey::RootField {
1227            name: "hello".to_string(),
1228            inputs: Default::default(),
1229            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1230        };
1231
1232        // Second response should be marked as a success as the status is 400!
1233        let response_succeed: http::Response<RouterBody> = http::Response::builder()
1234            .status(400)
1235            .body(router::body::from_bytes(r#"{}"#))
1236            .unwrap();
1237        let response_succeed_key = ResponseKey::RootField {
1238            name: "hello".to_string(),
1239            inputs: Default::default(),
1240            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1241        };
1242
1243        let supergraph_request = Arc::new(
1244            http::Request::builder()
1245                .body(graphql::Request::builder().build())
1246                .unwrap(),
1247        );
1248
1249        // Make failing request
1250        let res_expect_fail = super::aggregate_responses(vec![
1251            process_response(
1252                Ok(response_fail),
1253                response_fail_key,
1254                connector.clone(),
1255                &Context::default(),
1256                (None, Default::default()),
1257                None,
1258                supergraph_request.clone(),
1259            )
1260            .await
1261            .mapped_response,
1262        ])
1263        .unwrap()
1264        .response;
1265        assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1266        assert_eq!(res_expect_fail.body().errors.len(), 1);
1267
1268        // Make succeeding request
1269        let res_expect_success = super::aggregate_responses(vec![
1270            process_response(
1271                Ok(response_succeed),
1272                response_succeed_key,
1273                connector.clone(),
1274                &Context::default(),
1275                (None, Default::default()),
1276                None,
1277                supergraph_request.clone(),
1278            )
1279            .await
1280            .mapped_response,
1281        ])
1282        .unwrap()
1283        .response;
1284        assert!(res_expect_success.body().errors.is_empty());
1285        assert_eq!(
1286            &res_expect_success.body().data,
1287            &Some(json!({"hello": json!(400)}))
1288        );
1289    }
1290}