Skip to main content

apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use opentelemetry::KeyValue;
23use parking_lot::Mutex;
24use serde_json_bytes::Map;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44
45// --- ERRORS ------------------------------------------------------------------
46
47impl From<RuntimeError> for graphql::Error {
48    fn from(error: RuntimeError) -> Self {
49        let path: Path = (&error.path).into();
50
51        let err = graphql::Error::builder()
52            .message(&error.message)
53            .extensions(error.extensions())
54            .extension_code(error.code())
55            .path(path)
56            .build();
57
58        if let Some(subgraph_name) = &error.subgraph_name {
59            err.with_subgraph_name(subgraph_name)
60        } else {
61            err
62        }
63    }
64}
65
66// --- handle_responses --------------------------------------------------------
67
68#[allow(clippy::too_many_arguments)]
69pub(crate) async fn process_response<T: HttpBody>(
70    result: Result<http::Response<T>, Error>,
71    response_key: ResponseKey,
72    connector: Arc<Connector>,
73    context: &Context,
74    debug_request: DebugRequest,
75    debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
76    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
77    operation: Option<Arc<Valid<ExecutableDocument>>>,
78) -> connector::request_service::Response {
79    let (mapped_response, result) = match result {
80        // This occurs when we short-circuit the request when over the limit
81        Err(error) => {
82            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
83            (
84                MappedResponse::Error {
85                    error: error.to_runtime_error(&connector, &response_key),
86                    key: response_key,
87                    problems: Vec::new(),
88                },
89                Err(error),
90            )
91        }
92        Ok(response) => {
93            let (parts, body) = response.into_parts();
94
95            let result = Ok(TransportResponse::Http(HttpResponse {
96                inner: parts.clone(),
97            }));
98
99            let make_err = || {
100                let mut err = RuntimeError::new(
101                    "The server returned data in an unexpected format.".to_string(),
102                    &response_key,
103                );
104                err.subgraph_name = Some(connector.id.subgraph_name.clone());
105                err = err.with_code("CONNECTOR_RESPONSE_INVALID");
106                err.coordinate = Some(connector.id.coordinate());
107                err = err.extension(
108                    "http",
109                    Value::Object(Map::from_iter([(
110                        "status".into(),
111                        Value::Number(parts.status.as_u16().into()),
112                    )])),
113                );
114                err
115            };
116
117            let deserialized_body = body
118                .collect()
119                .await
120                .map_err(|_| ())
121                .and_then(|body| {
122                    let body = body.to_bytes();
123                    let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
124                        if let Some(debug_context) = debug_context {
125                            debug_context.lock().push_invalid_response(
126                                debug_request.0.clone(),
127                                &parts,
128                                &body,
129                                &connector.error_settings,
130                                debug_request.1.clone(),
131                            );
132                        }
133                    });
134                    log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
135                    raw
136                })
137                .map_err(|()| make_err());
138
139            // If this errors, it will write to the debug context because it
140            // has access to the raw bytes, so we can't write to it again
141            // in any RawResponse::Error branches.
142            let mapped = match &deserialized_body {
143                Err(error) => MappedResponse::Error {
144                    error: error.clone(),
145                    key: response_key,
146                    problems: Vec::new(),
147                },
148                Ok(data) => handle_raw_response(
149                    data,
150                    &parts,
151                    response_key,
152                    &connector,
153                    context,
154                    supergraph_request.headers(),
155                )
156                .apply_operation(
157                    operation
158                        .as_ref()
159                        .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
160                    &connector.schema_subtypes_map,
161                ),
162            };
163
164            if let Some(debug) = debug_context {
165                let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
166                debug_problems.extend(debug_request.1);
167
168                let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
169                    Some(SelectionData {
170                        source: connector.selection.to_string(),
171                        transformed: key.selection().to_string(),
172                        result: Some(data.clone()),
173                    })
174                } else {
175                    None
176                };
177
178                debug.lock().push_response(
179                    debug_request.0,
180                    &parts,
181                    deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
182                    selection_data,
183                    &connector.error_settings,
184                    debug_problems,
185                );
186            }
187            if matches!(mapped, MappedResponse::Data { .. }) {
188                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
189            } else {
190                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
191            }
192
193            (mapped, result)
194        }
195    };
196
197    if let MappedResponse::Error { ref error, .. } = mapped_response {
198        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
199    }
200
201    connector::request_service::Response {
202        context: context.clone(),
203        transport_result: result,
204        mapped_response,
205    }
206}
207
208pub(crate) fn aggregate_responses(
209    responses: Vec<MappedResponse>,
210    _context: Context,
211) -> Result<Response, HandleResponseError> {
212    let mut data = serde_json_bytes::Map::new();
213    let mut errors = Vec::new();
214    let count = responses.len();
215
216    for mapped in responses {
217        mapped.add_to_data(&mut data, &mut errors, count)?;
218    }
219
220    let data = if data.is_empty() {
221        Value::Null
222    } else {
223        Value::Object(data)
224    };
225
226    Span::current().record(
227        OTEL_STATUS_CODE,
228        if errors.is_empty() {
229            OTEL_STATUS_CODE_OK
230        } else {
231            OTEL_STATUS_CODE_ERROR
232        },
233    );
234
235    Ok(Response {
236        response: http::Response::builder()
237            .body(
238                graphql::Response::builder()
239                    .data(data)
240                    .errors(errors.into_iter().map(|e| e.into()).collect())
241                    .build(),
242            )
243            .unwrap(),
244    })
245}
246
247fn log_connectors_event(
248    context: &Context,
249    body: &[u8],
250    parts: &Parts,
251    response_key: ResponseKey,
252    connector: &Connector,
253) {
254    let log_response_level = context
255        .extensions()
256        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
257        .and_then(|event| {
258            // TODO: evaluate if this is still needed now that we're cloning the body anyway
259            // Create a temporary response here so we can evaluate the condition. This response
260            // is missing any information about the mapped response, because we don't have that
261            // yet. This means that we cannot correctly evaluate any condition that relies on
262            // the mapped response data or mapping problems. But we can't wait until we do have
263            // that information, because this is the only place we have the body bytes (without
264            // making an expensive clone of the body). So we either need to not expose any
265            // selector which can be used as a condition that requires mapping information, or
266            // we must document that such selectors cannot be used as conditions on standard
267            // connectors events.
268
269            let response = connector::request_service::Response {
270                context: context.clone(),
271                transport_result: Ok(TransportResponse::Http(HttpResponse {
272                    inner: parts.clone(),
273                })),
274                mapped_response: MappedResponse::Data {
275                    data: Value::Null,
276                    key: response_key,
277                    problems: vec![],
278                },
279            };
280            if event.condition.evaluate_response(&response) {
281                Some(event.level)
282            } else {
283                None
284            }
285        });
286
287    if let Some(level) = log_response_level {
288        let mut attrs = Vec::with_capacity(4);
289        #[cfg(test)]
290        let headers = {
291            let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
292                .headers
293                .iter()
294                .map(|(name, val)| (name.to_string(), val.clone()))
295                .collect();
296            headers.sort_keys();
297            headers
298        };
299        #[cfg(not(test))]
300        let headers = &parts.headers;
301
302        attrs.push(KeyValue::new(
303            HTTP_RESPONSE_HEADERS,
304            opentelemetry::Value::String(format!("{headers:?}").into()),
305        ));
306        attrs.push(KeyValue::new(
307            HTTP_RESPONSE_STATUS,
308            opentelemetry::Value::String(format!("{}", parts.status).into()),
309        ));
310        attrs.push(KeyValue::new(
311            HTTP_RESPONSE_VERSION,
312            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
313        ));
314        attrs.push(KeyValue::new(
315            HTTP_RESPONSE_BODY,
316            opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
317        ));
318
319        log_event(
320            level,
321            "connector.response",
322            attrs,
323            &format!("Response from connector {label:?}", label = connector.label),
324        );
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use std::sync::Arc;
331
332    use apollo_compiler::Schema;
333    use apollo_compiler::collections::IndexMap;
334    use apollo_compiler::name;
335    use apollo_compiler::response::JsonValue;
336    use apollo_federation::connectors::ConnectId;
337    use apollo_federation::connectors::ConnectSpec;
338    use apollo_federation::connectors::Connector;
339    use apollo_federation::connectors::ConnectorErrorsSettings;
340    use apollo_federation::connectors::EntityResolver;
341    use apollo_federation::connectors::HTTPMethod;
342    use apollo_federation::connectors::HttpJsonTransport;
343    use apollo_federation::connectors::JSONSelection;
344    use apollo_federation::connectors::Label;
345    use apollo_federation::connectors::Namespace;
346    use apollo_federation::connectors::runtime::inputs::RequestInputs;
347    use apollo_federation::connectors::runtime::key::ResponseKey;
348    use insta::assert_debug_snapshot;
349    use itertools::Itertools;
350    use serde_json_bytes::json;
351
352    use crate::Context;
353    use crate::graphql;
354    use crate::plugins::connectors::handle_responses::process_response;
355    use crate::services::router;
356    use crate::services::router::body::RouterBody;
357
358    #[tokio::test]
359    async fn test_handle_responses_root_fields() {
360        let connector = Arc::new(Connector {
361            spec: ConnectSpec::V0_1,
362            schema_subtypes_map: Default::default(),
363            id: ConnectId::new(
364                "subgraph_name".into(),
365                None,
366                name!(Query),
367                name!(hello),
368                None,
369                0,
370            ),
371            transport: HttpJsonTransport {
372                source_template: "http://localhost/api".parse().ok(),
373                connect_template: "/path".parse().unwrap(),
374                ..Default::default()
375            },
376            selection: JSONSelection::parse("$.data").unwrap(),
377            entity_resolver: None,
378            config: Default::default(),
379            max_requests: None,
380            batch_settings: None,
381            request_headers: Default::default(),
382            response_headers: Default::default(),
383            request_variable_keys: Default::default(),
384            response_variable_keys: Default::default(),
385            error_settings: Default::default(),
386            label: "test label".into(),
387        });
388
389        let response1: http::Response<RouterBody> = http::Response::builder()
390            .body(router::body::from_bytes(r#"{"data":"world"}"#))
391            .unwrap();
392        let response_key1 = ResponseKey::RootField {
393            name: "hello".to_string(),
394            inputs: Default::default(),
395            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
396        };
397
398        let response2 = http::Response::builder()
399            .body(router::body::from_bytes(r#"{"data":"world"}"#))
400            .unwrap();
401        let response_key2 = ResponseKey::RootField {
402            name: "hello2".to_string(),
403            inputs: Default::default(),
404            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
405        };
406
407        let supergraph_request = Arc::new(
408            http::Request::builder()
409                .body(graphql::Request::builder().build())
410                .unwrap(),
411        );
412
413        let res = super::aggregate_responses(
414            vec![
415                process_response(
416                    Ok(response1),
417                    response_key1,
418                    connector.clone(),
419                    &Context::default(),
420                    (None, Default::default()),
421                    None,
422                    supergraph_request.clone(),
423                    Default::default(),
424                )
425                .await
426                .mapped_response,
427                process_response(
428                    Ok(response2),
429                    response_key2,
430                    connector,
431                    &Context::default(),
432                    (None, Default::default()),
433                    None,
434                    supergraph_request,
435                    Default::default(),
436                )
437                .await
438                .mapped_response,
439            ],
440            Context::new(),
441        )
442        .unwrap();
443
444        assert_debug_snapshot!(res.response, @r#"
445        Response {
446            status: 200,
447            version: HTTP/1.1,
448            headers: {},
449            body: Response {
450                label: None,
451                data: Some(
452                    Object({
453                        "hello": String(
454                            "world",
455                        ),
456                        "hello2": String(
457                            "world",
458                        ),
459                    }),
460                ),
461                path: None,
462                errors: [],
463                extensions: {},
464                has_next: None,
465                subscribed: None,
466                created_at: None,
467                incremental: [],
468            },
469        }
470        "#);
471    }
472
473    #[tokio::test]
474    async fn test_handle_responses_entities() {
475        let connector = Arc::new(Connector {
476            spec: ConnectSpec::V0_1,
477            schema_subtypes_map: Default::default(),
478            id: ConnectId::new(
479                "subgraph_name".into(),
480                None,
481                name!(Query),
482                name!(user),
483                None,
484                0,
485            ),
486            transport: HttpJsonTransport {
487                source_template: "http://localhost/api".parse().ok(),
488                connect_template: "/path".parse().unwrap(),
489                ..Default::default()
490            },
491            selection: JSONSelection::parse("$.data { id }").unwrap(),
492            entity_resolver: Some(EntityResolver::Explicit),
493            config: Default::default(),
494            max_requests: None,
495            batch_settings: None,
496            request_headers: Default::default(),
497            response_headers: Default::default(),
498            request_variable_keys: Default::default(),
499            response_variable_keys: Default::default(),
500            error_settings: Default::default(),
501            label: "test label".into(),
502        });
503
504        let response1: http::Response<RouterBody> = http::Response::builder()
505            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
506            .unwrap();
507        let response_key1 = ResponseKey::Entity {
508            index: 0,
509            inputs: Default::default(),
510            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
511        };
512
513        let response2 = http::Response::builder()
514            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
515            .unwrap();
516        let response_key2 = ResponseKey::Entity {
517            index: 1,
518            inputs: Default::default(),
519            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
520        };
521
522        let supergraph_request = Arc::new(
523            http::Request::builder()
524                .body(graphql::Request::builder().build())
525                .unwrap(),
526        );
527
528        let res = super::aggregate_responses(
529            vec![
530                process_response(
531                    Ok(response1),
532                    response_key1,
533                    connector.clone(),
534                    &Context::default(),
535                    (None, Default::default()),
536                    None,
537                    supergraph_request.clone(),
538                    Default::default(),
539                )
540                .await
541                .mapped_response,
542                process_response(
543                    Ok(response2),
544                    response_key2,
545                    connector,
546                    &Context::default(),
547                    (None, Default::default()),
548                    None,
549                    supergraph_request,
550                    Default::default(),
551                )
552                .await
553                .mapped_response,
554            ],
555            Context::new(),
556        )
557        .unwrap();
558
559        assert_debug_snapshot!(res.response, @r#"
560        Response {
561            status: 200,
562            version: HTTP/1.1,
563            headers: {},
564            body: Response {
565                label: None,
566                data: Some(
567                    Object({
568                        "_entities": Array([
569                            Object({
570                                "id": String(
571                                    "1",
572                                ),
573                            }),
574                            Object({
575                                "id": String(
576                                    "2",
577                                ),
578                            }),
579                        ]),
580                    }),
581                ),
582                path: None,
583                errors: [],
584                extensions: {},
585                has_next: None,
586                subscribed: None,
587                created_at: None,
588                incremental: [],
589            },
590        }
591        "#);
592    }
593
594    #[tokio::test]
595    async fn test_handle_responses_batch() {
596        let connector = Arc::new(Connector {
597            spec: ConnectSpec::V0_2,
598            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
599            schema_subtypes_map: Default::default(),
600            transport: HttpJsonTransport {
601                source_template: "http://localhost/api".parse().ok(),
602                connect_template: "/path".parse().unwrap(),
603                method: HTTPMethod::Post,
604                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
605                ..Default::default()
606            },
607            selection: JSONSelection::parse("$.data { id name }").unwrap(),
608            entity_resolver: Some(EntityResolver::TypeBatch),
609            config: Default::default(),
610            max_requests: None,
611            batch_settings: None,
612            request_headers: Default::default(),
613            response_headers: Default::default(),
614            request_variable_keys: Default::default(),
615            response_variable_keys: Default::default(),
616            error_settings: Default::default(),
617            label: "test label".into(),
618        });
619
620        let keys = connector
621            .resolvable_key(
622                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
623                    .unwrap(),
624            )
625            .unwrap()
626            .unwrap();
627
628        let response1: http::Response<RouterBody> = http::Response::builder()
629            // different order from the request inputs
630            .body(router::body::from_bytes(
631                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
632            ))
633            .unwrap();
634
635        let mut inputs: RequestInputs = RequestInputs::default();
636        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
637        inputs.batch = representations
638            .as_array()
639            .unwrap()
640            .iter()
641            .map(|v| v.as_object().unwrap().clone())
642            .collect_vec();
643
644        let response_key1 = ResponseKey::BatchEntity {
645            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
646            keys,
647            inputs,
648        };
649
650        let supergraph_request = Arc::new(
651            http::Request::builder()
652                .body(graphql::Request::builder().build())
653                .unwrap(),
654        );
655
656        let res = super::aggregate_responses(
657            vec![
658                process_response(
659                    Ok(response1),
660                    response_key1,
661                    connector.clone(),
662                    &Context::default(),
663                    (None, Default::default()),
664                    None,
665                    supergraph_request,
666                    Default::default(),
667                )
668                .await
669                .mapped_response,
670            ],
671            Context::new(),
672        )
673        .unwrap();
674
675        assert_debug_snapshot!(res.response, @r#"
676        Response {
677            status: 200,
678            version: HTTP/1.1,
679            headers: {},
680            body: Response {
681                label: None,
682                data: Some(
683                    Object({
684                        "_entities": Array([
685                            Object({
686                                "id": String(
687                                    "1",
688                                ),
689                                "name": String(
690                                    "A",
691                                ),
692                            }),
693                            Object({
694                                "id": String(
695                                    "2",
696                                ),
697                                "name": String(
698                                    "B",
699                                ),
700                            }),
701                        ]),
702                    }),
703                ),
704                path: None,
705                errors: [],
706                extensions: {},
707                has_next: None,
708                subscribed: None,
709                created_at: None,
710                incremental: [],
711            },
712        }
713        "#);
714    }
715
716    #[tokio::test]
717    async fn test_handle_responses_entity_field() {
718        let connector = Arc::new(Connector {
719            spec: ConnectSpec::V0_1,
720            schema_subtypes_map: Default::default(),
721            id: ConnectId::new(
722                "subgraph_name".into(),
723                None,
724                name!(User),
725                name!(field),
726                None,
727                0,
728            ),
729            transport: HttpJsonTransport {
730                source_template: "http://localhost/api".parse().ok(),
731                connect_template: "/path".parse().unwrap(),
732                ..Default::default()
733            },
734            selection: JSONSelection::parse("$.data").unwrap(),
735            entity_resolver: Some(EntityResolver::Implicit),
736            config: Default::default(),
737            max_requests: None,
738            batch_settings: None,
739            request_headers: Default::default(),
740            response_headers: Default::default(),
741            request_variable_keys: Default::default(),
742            response_variable_keys: Default::default(),
743            error_settings: Default::default(),
744            label: "test label".into(),
745        });
746
747        let response1: http::Response<RouterBody> = http::Response::builder()
748            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
749            .unwrap();
750        let response_key1 = ResponseKey::EntityField {
751            index: 0,
752            inputs: Default::default(),
753            field_name: "field".to_string(),
754            typename: Some(name!("User")),
755            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
756        };
757
758        let response2 = http::Response::builder()
759            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
760            .unwrap();
761        let response_key2 = ResponseKey::EntityField {
762            index: 1,
763            inputs: Default::default(),
764            field_name: "field".to_string(),
765            typename: Some(name!("User")),
766            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
767        };
768
769        let supergraph_request = Arc::new(
770            http::Request::builder()
771                .body(graphql::Request::builder().build())
772                .unwrap(),
773        );
774
775        let res = super::aggregate_responses(
776            vec![
777                process_response(
778                    Ok(response1),
779                    response_key1,
780                    connector.clone(),
781                    &Context::default(),
782                    (None, Default::default()),
783                    None,
784                    supergraph_request.clone(),
785                    Default::default(),
786                )
787                .await
788                .mapped_response,
789                process_response(
790                    Ok(response2),
791                    response_key2,
792                    connector,
793                    &Context::default(),
794                    (None, Default::default()),
795                    None,
796                    supergraph_request,
797                    Default::default(),
798                )
799                .await
800                .mapped_response,
801            ],
802            Context::new(),
803        )
804        .unwrap();
805
806        assert_debug_snapshot!(res.response, @r#"
807        Response {
808            status: 200,
809            version: HTTP/1.1,
810            headers: {},
811            body: Response {
812                label: None,
813                data: Some(
814                    Object({
815                        "_entities": Array([
816                            Object({
817                                "__typename": String(
818                                    "User",
819                                ),
820                                "field": String(
821                                    "value1",
822                                ),
823                            }),
824                            Object({
825                                "__typename": String(
826                                    "User",
827                                ),
828                                "field": String(
829                                    "value2",
830                                ),
831                            }),
832                        ]),
833                    }),
834                ),
835                path: None,
836                errors: [],
837                extensions: {},
838                has_next: None,
839                subscribed: None,
840                created_at: None,
841                incremental: [],
842            },
843        }
844        "#);
845    }
846
847    #[tokio::test]
848    async fn test_handle_responses_errors() {
849        let connector = Arc::new(Connector {
850            spec: ConnectSpec::V0_1,
851            schema_subtypes_map: Default::default(),
852            id: ConnectId::new(
853                "subgraph_name".into(),
854                None,
855                name!(Query),
856                name!(user),
857                None,
858                0,
859            ),
860            transport: HttpJsonTransport {
861                source_template: "http://localhost/api".parse().ok(),
862                connect_template: "/path".parse().unwrap(),
863                ..Default::default()
864            },
865            selection: JSONSelection::parse("$.data").unwrap(),
866            entity_resolver: Some(EntityResolver::Explicit),
867            config: Default::default(),
868            max_requests: None,
869            batch_settings: None,
870            request_headers: Default::default(),
871            response_headers: Default::default(),
872            request_variable_keys: Default::default(),
873            response_variable_keys: Default::default(),
874            error_settings: Default::default(),
875            label: "test label".into(),
876        });
877
878        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
879            .body(router::body::from_bytes(r#"plain text"#))
880            .unwrap();
881        let response_key_plaintext = ResponseKey::Entity {
882            index: 0,
883            inputs: Default::default(),
884            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
885        };
886
887        let response1: http::Response<RouterBody> = http::Response::builder()
888            .status(404)
889            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
890            .unwrap();
891        let response_key1 = ResponseKey::Entity {
892            index: 1,
893            inputs: Default::default(),
894            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
895        };
896
897        let response2 = http::Response::builder()
898            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
899            .unwrap();
900        let response_key2 = ResponseKey::Entity {
901            index: 2,
902            inputs: Default::default(),
903            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
904        };
905
906        let response3: http::Response<RouterBody> = http::Response::builder()
907            .status(500)
908            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
909            .unwrap();
910        let response_key3 = ResponseKey::Entity {
911            index: 3,
912            inputs: Default::default(),
913            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
914        };
915
916        let supergraph_request = Arc::new(
917            http::Request::builder()
918                .body(graphql::Request::builder().build())
919                .unwrap(),
920        );
921
922        let mut res = super::aggregate_responses(
923            vec![
924                process_response(
925                    Ok(response_plaintext),
926                    response_key_plaintext,
927                    connector.clone(),
928                    &Context::default(),
929                    (None, Default::default()),
930                    None,
931                    supergraph_request.clone(),
932                    Default::default(),
933                )
934                .await
935                .mapped_response,
936                process_response(
937                    Ok(response1),
938                    response_key1,
939                    connector.clone(),
940                    &Context::default(),
941                    (None, Default::default()),
942                    None,
943                    supergraph_request.clone(),
944                    Default::default(),
945                )
946                .await
947                .mapped_response,
948                process_response(
949                    Ok(response2),
950                    response_key2,
951                    connector.clone(),
952                    &Context::default(),
953                    (None, Default::default()),
954                    None,
955                    supergraph_request.clone(),
956                    Default::default(),
957                )
958                .await
959                .mapped_response,
960                process_response(
961                    Ok(response3),
962                    response_key3,
963                    connector,
964                    &Context::default(),
965                    (None, Default::default()),
966                    None,
967                    supergraph_request,
968                    Default::default(),
969                )
970                .await
971                .mapped_response,
972            ],
973            Context::new(),
974        )
975        .unwrap();
976
977        // Overwrite error IDs to avoid random Uuid mismatch.
978        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
979        // we have to do it manually.
980        let body = res.response.body_mut();
981        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
982
983        assert_debug_snapshot!(res.response, @r#"
984        Response {
985            status: 200,
986            version: HTTP/1.1,
987            headers: {},
988            body: Response {
989                label: None,
990                data: Some(
991                    Object({
992                        "_entities": Array([
993                            Null,
994                            Null,
995                            Object({
996                                "id": String(
997                                    "2",
998                                ),
999                            }),
1000                            Null,
1001                        ]),
1002                    }),
1003                ),
1004                path: None,
1005                errors: [
1006                    Error {
1007                        message: "The server returned data in an unexpected format.",
1008                        locations: [],
1009                        path: Some(
1010                            Path(
1011                                [
1012                                    Key(
1013                                        "_entities",
1014                                        None,
1015                                    ),
1016                                    Index(
1017                                        0,
1018                                    ),
1019                                ],
1020                            ),
1021                        ),
1022                        extensions: {
1023                            "code": String(
1024                                "CONNECTOR_RESPONSE_INVALID",
1025                            ),
1026                            "service": String(
1027                                "subgraph_name",
1028                            ),
1029                            "connector": Object({
1030                                "coordinate": String(
1031                                    "subgraph_name:Query.user[0]",
1032                                ),
1033                            }),
1034                            "http": Object({
1035                                "status": Number(200),
1036                            }),
1037                            "apollo.private.subgraph.name": String(
1038                                "subgraph_name",
1039                            ),
1040                        },
1041                        apollo_id: 00000000-0000-0000-0000-000000000000,
1042                    },
1043                    Error {
1044                        message: "Request failed",
1045                        locations: [],
1046                        path: Some(
1047                            Path(
1048                                [
1049                                    Key(
1050                                        "_entities",
1051                                        None,
1052                                    ),
1053                                    Index(
1054                                        1,
1055                                    ),
1056                                ],
1057                            ),
1058                        ),
1059                        extensions: {
1060                            "code": String(
1061                                "CONNECTOR_FETCH",
1062                            ),
1063                            "service": String(
1064                                "subgraph_name",
1065                            ),
1066                            "connector": Object({
1067                                "coordinate": String(
1068                                    "subgraph_name:Query.user[0]",
1069                                ),
1070                            }),
1071                            "http": Object({
1072                                "status": Number(404),
1073                            }),
1074                            "apollo.private.subgraph.name": String(
1075                                "subgraph_name",
1076                            ),
1077                        },
1078                        apollo_id: 00000000-0000-0000-0000-000000000000,
1079                    },
1080                    Error {
1081                        message: "Request failed",
1082                        locations: [],
1083                        path: Some(
1084                            Path(
1085                                [
1086                                    Key(
1087                                        "_entities",
1088                                        None,
1089                                    ),
1090                                    Index(
1091                                        3,
1092                                    ),
1093                                ],
1094                            ),
1095                        ),
1096                        extensions: {
1097                            "code": String(
1098                                "CONNECTOR_FETCH",
1099                            ),
1100                            "service": String(
1101                                "subgraph_name",
1102                            ),
1103                            "connector": Object({
1104                                "coordinate": String(
1105                                    "subgraph_name:Query.user[0]",
1106                                ),
1107                            }),
1108                            "http": Object({
1109                                "status": Number(500),
1110                            }),
1111                            "apollo.private.subgraph.name": String(
1112                                "subgraph_name",
1113                            ),
1114                        },
1115                        apollo_id: 00000000-0000-0000-0000-000000000000,
1116                    },
1117                ],
1118                extensions: {},
1119                has_next: None,
1120                subscribed: None,
1121                created_at: None,
1122                incremental: [],
1123            },
1124        }
1125        "#);
1126    }
1127
1128    #[tokio::test]
1129    async fn test_handle_responses_status() {
1130        let selection = JSONSelection::parse("$status").unwrap();
1131        let connector = Arc::new(Connector {
1132            spec: ConnectSpec::V0_1,
1133            schema_subtypes_map: Default::default(),
1134            id: ConnectId::new(
1135                "subgraph_name".into(),
1136                None,
1137                name!(Query),
1138                name!(hello),
1139                None,
1140                0,
1141            ),
1142            transport: HttpJsonTransport {
1143                source_template: "http://localhost/api".parse().ok(),
1144                connect_template: "/path".parse().unwrap(),
1145                ..Default::default()
1146            },
1147            selection: selection.clone(),
1148            entity_resolver: None,
1149            config: Default::default(),
1150            max_requests: None,
1151            batch_settings: None,
1152            request_headers: Default::default(),
1153            response_headers: Default::default(),
1154            request_variable_keys: Default::default(),
1155            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1156            error_settings: Default::default(),
1157            label: "test label".into(),
1158        });
1159
1160        let response1: http::Response<RouterBody> = http::Response::builder()
1161            .status(201)
1162            .body(router::body::from_bytes(r#"{}"#))
1163            .unwrap();
1164        let response_key1 = ResponseKey::RootField {
1165            name: "hello".to_string(),
1166            inputs: Default::default(),
1167            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1168        };
1169
1170        let supergraph_request = Arc::new(
1171            http::Request::builder()
1172                .body(graphql::Request::builder().build())
1173                .unwrap(),
1174        );
1175
1176        let res = super::aggregate_responses(
1177            vec![
1178                process_response(
1179                    Ok(response1),
1180                    response_key1,
1181                    connector,
1182                    &Context::default(),
1183                    (None, Default::default()),
1184                    None,
1185                    supergraph_request,
1186                    Default::default(),
1187                )
1188                .await
1189                .mapped_response,
1190            ],
1191            Context::new(),
1192        )
1193        .unwrap();
1194
1195        assert_debug_snapshot!(res.response, @r#"
1196        Response {
1197            status: 200,
1198            version: HTTP/1.1,
1199            headers: {},
1200            body: Response {
1201                label: None,
1202                data: Some(
1203                    Object({
1204                        "hello": Number(201),
1205                    }),
1206                ),
1207                path: None,
1208                errors: [],
1209                extensions: {},
1210                has_next: None,
1211                subscribed: None,
1212                created_at: None,
1213                incremental: [],
1214            },
1215        }
1216        "#);
1217    }
1218
1219    #[tokio::test]
1220    async fn test_handle_response_with_is_success() {
1221        let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1222        let selection = JSONSelection::parse("$status").unwrap();
1223        let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1224            message: Default::default(),
1225            source_extensions: Default::default(),
1226            connect_extensions: Default::default(),
1227            connect_is_success: Some(is_success.clone()),
1228        };
1229        let connector = Arc::new(Connector {
1230            spec: ConnectSpec::V0_1,
1231            schema_subtypes_map: Default::default(),
1232            id: ConnectId::new(
1233                "subgraph_name".into(),
1234                None,
1235                name!(Query),
1236                name!(hello),
1237                None,
1238                0,
1239            ),
1240            transport: HttpJsonTransport {
1241                source_template: "http://localhost/api".parse().ok(),
1242                connect_template: "/path".parse().unwrap(),
1243                ..Default::default()
1244            },
1245            selection: selection.clone(),
1246            entity_resolver: None,
1247            config: Default::default(),
1248            max_requests: None,
1249            batch_settings: None,
1250            request_headers: Default::default(),
1251            response_headers: Default::default(),
1252            request_variable_keys: Default::default(),
1253            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1254            error_settings,
1255            label: Label::from("test label"),
1256        });
1257
1258        // First request should be marked as error as status is NOT 400
1259        let response_fail: http::Response<RouterBody> = http::Response::builder()
1260            .status(201)
1261            .body(router::body::from_bytes(r#"{}"#))
1262            .unwrap();
1263        let response_fail_key = ResponseKey::RootField {
1264            name: "hello".to_string(),
1265            inputs: Default::default(),
1266            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1267        };
1268
1269        // Second response should be marked as a success as the status is 400!
1270        let response_succeed: http::Response<RouterBody> = http::Response::builder()
1271            .status(400)
1272            .body(router::body::from_bytes(r#"{}"#))
1273            .unwrap();
1274        let response_succeed_key = ResponseKey::RootField {
1275            name: "hello".to_string(),
1276            inputs: Default::default(),
1277            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1278        };
1279
1280        let supergraph_request = Arc::new(
1281            http::Request::builder()
1282                .body(graphql::Request::builder().build())
1283                .unwrap(),
1284        );
1285
1286        // Make failing request
1287        let res_expect_fail = super::aggregate_responses(
1288            vec![
1289                process_response(
1290                    Ok(response_fail),
1291                    response_fail_key,
1292                    connector.clone(),
1293                    &Context::default(),
1294                    (None, Default::default()),
1295                    None,
1296                    supergraph_request.clone(),
1297                    Default::default(),
1298                )
1299                .await
1300                .mapped_response,
1301            ],
1302            Context::new(),
1303        )
1304        .unwrap()
1305        .response;
1306        assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1307        assert_eq!(res_expect_fail.body().errors.len(), 1);
1308
1309        // Make succeeding request
1310        let res_expect_success = super::aggregate_responses(
1311            vec![
1312                process_response(
1313                    Ok(response_succeed),
1314                    response_succeed_key,
1315                    connector.clone(),
1316                    &Context::default(),
1317                    (None, Default::default()),
1318                    None,
1319                    supergraph_request.clone(),
1320                    Default::default(),
1321                )
1322                .await
1323                .mapped_response,
1324            ],
1325            Context::new(),
1326        )
1327        .unwrap()
1328        .response;
1329        assert!(res_expect_success.body().errors.is_empty());
1330        assert_eq!(
1331            &res_expect_success.body().data,
1332            &Some(json!({"hello": json!(400)}))
1333        );
1334    }
1335}