apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use opentelemetry::KeyValue;
23use parking_lot::Mutex;
24use serde_json_bytes::Map;
25use serde_json_bytes::Value;
26use tracing::Span;
27
28use crate::Context;
29use crate::graphql;
30use crate::json_ext::Path;
31use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
32use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
33use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
35use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
36use crate::plugins::telemetry::config_new::events::log_event;
37use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
38use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
39use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
40use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
41use crate::services::connect::Response;
42use crate::services::connector;
43use crate::services::fetch::AddSubgraphNameExt;
44
45// --- ERRORS ------------------------------------------------------------------
46
47impl From<RuntimeError> for graphql::Error {
48    fn from(error: RuntimeError) -> Self {
49        let path: Path = (&error.path).into();
50
51        let err = graphql::Error::builder()
52            .message(&error.message)
53            .extensions(error.extensions())
54            .extension_code(error.code())
55            .path(path)
56            .build();
57
58        if let Some(subgraph_name) = &error.subgraph_name {
59            err.with_subgraph_name(subgraph_name)
60        } else {
61            err
62        }
63    }
64}
65
66// --- handle_responses --------------------------------------------------------
67
68#[allow(clippy::too_many_arguments)]
69pub(crate) async fn process_response<T: HttpBody>(
70    result: Result<http::Response<T>, Error>,
71    response_key: ResponseKey,
72    connector: Arc<Connector>,
73    context: &Context,
74    debug_request: DebugRequest,
75    debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
76    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
77    operation: Option<Arc<Valid<ExecutableDocument>>>,
78) -> connector::request_service::Response {
79    let (mapped_response, result) = match result {
80        // This occurs when we short-circuit the request when over the limit
81        Err(error) => {
82            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
83            (
84                MappedResponse::Error {
85                    error: error.to_runtime_error(&connector, &response_key),
86                    key: response_key,
87                    problems: Vec::new(),
88                },
89                Err(error),
90            )
91        }
92        Ok(response) => {
93            let (parts, body) = response.into_parts();
94
95            let result = Ok(TransportResponse::Http(HttpResponse {
96                inner: parts.clone(),
97            }));
98
99            let make_err = || {
100                let mut err = RuntimeError::new(
101                    "The server returned data in an unexpected format.".to_string(),
102                    &response_key,
103                );
104                err.subgraph_name = Some(connector.id.subgraph_name.clone());
105                err = err.with_code("CONNECTOR_RESPONSE_INVALID");
106                err.coordinate = Some(connector.id.coordinate());
107                err = err.extension(
108                    "http",
109                    Value::Object(Map::from_iter([(
110                        "status".into(),
111                        Value::Number(parts.status.as_u16().into()),
112                    )])),
113                );
114                err
115            };
116
117            let deserialized_body = body
118                .collect()
119                .await
120                .map_err(|_| ())
121                .and_then(|body| {
122                    let body = body.to_bytes();
123                    let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
124                        if let Some(debug_context) = debug_context {
125                            debug_context.lock().push_invalid_response(
126                                debug_request.0.clone(),
127                                &parts,
128                                &body,
129                                &connector.error_settings,
130                                debug_request.1.clone(),
131                            );
132                        }
133                    });
134                    log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
135                    raw
136                })
137                .map_err(|()| make_err());
138
139            // If this errors, it will write to the debug context because it
140            // has access to the raw bytes, so we can't write to it again
141            // in any RawResponse::Error branches.
142            let mapped = match &deserialized_body {
143                Err(error) => MappedResponse::Error {
144                    error: error.clone(),
145                    key: response_key,
146                    problems: Vec::new(),
147                },
148                Ok(data) => handle_raw_response(
149                    data,
150                    &parts,
151                    response_key,
152                    &connector,
153                    context,
154                    supergraph_request.headers(),
155                )
156                .apply_operation(
157                    operation
158                        .as_ref()
159                        .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
160                    &connector.schema_subtypes_map,
161                ),
162            };
163
164            if let Some(debug) = debug_context {
165                let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
166                debug_problems.extend(debug_request.1);
167
168                let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
169                    Some(SelectionData {
170                        source: connector.selection.to_string(),
171                        transformed: key.selection().to_string(),
172                        result: Some(data.clone()),
173                    })
174                } else {
175                    None
176                };
177
178                debug.lock().push_response(
179                    debug_request.0,
180                    &parts,
181                    deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
182                    selection_data,
183                    &connector.error_settings,
184                    debug_problems,
185                );
186            }
187            if matches!(mapped, MappedResponse::Data { .. }) {
188                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
189            } else {
190                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
191            }
192
193            (mapped, result)
194        }
195    };
196
197    if let MappedResponse::Error { ref error, .. } = mapped_response {
198        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
199    }
200
201    connector::request_service::Response {
202        transport_result: result,
203        mapped_response,
204    }
205}
206
207pub(crate) fn aggregate_responses(
208    responses: Vec<MappedResponse>,
209    _context: Context,
210) -> Result<Response, HandleResponseError> {
211    let mut data = serde_json_bytes::Map::new();
212    let mut errors = Vec::new();
213    let count = responses.len();
214
215    for mapped in responses {
216        mapped.add_to_data(&mut data, &mut errors, count)?;
217    }
218
219    let data = if data.is_empty() {
220        Value::Null
221    } else {
222        Value::Object(data)
223    };
224
225    Span::current().record(
226        OTEL_STATUS_CODE,
227        if errors.is_empty() {
228            OTEL_STATUS_CODE_OK
229        } else {
230            OTEL_STATUS_CODE_ERROR
231        },
232    );
233
234    Ok(Response {
235        response: http::Response::builder()
236            .body(
237                graphql::Response::builder()
238                    .data(data)
239                    .errors(errors.into_iter().map(|e| e.into()).collect())
240                    .build(),
241            )
242            .unwrap(),
243    })
244}
245
246fn log_connectors_event(
247    context: &Context,
248    body: &[u8],
249    parts: &Parts,
250    response_key: ResponseKey,
251    connector: &Connector,
252) {
253    let log_response_level = context
254        .extensions()
255        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
256        .and_then(|event| {
257            // TODO: evaluate if this is still needed now that we're cloning the body anyway
258            // Create a temporary response here so we can evaluate the condition. This response
259            // is missing any information about the mapped response, because we don't have that
260            // yet. This means that we cannot correctly evaluate any condition that relies on
261            // the mapped response data or mapping problems. But we can't wait until we do have
262            // that information, because this is the only place we have the body bytes (without
263            // making an expensive clone of the body). So we either need to not expose any
264            // selector which can be used as a condition that requires mapping information, or
265            // we must document that such selectors cannot be used as conditions on standard
266            // connectors events.
267
268            let response = connector::request_service::Response {
269                transport_result: Ok(TransportResponse::Http(HttpResponse {
270                    inner: parts.clone(),
271                })),
272                mapped_response: MappedResponse::Data {
273                    data: Value::Null,
274                    key: response_key,
275                    problems: vec![],
276                },
277            };
278            if event.condition.evaluate_response(&response) {
279                Some(event.level)
280            } else {
281                None
282            }
283        });
284
285    if let Some(level) = log_response_level {
286        let mut attrs = Vec::with_capacity(4);
287        #[cfg(test)]
288        let headers = {
289            let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
290                .headers
291                .iter()
292                .map(|(name, val)| (name.to_string(), val.clone()))
293                .collect();
294            headers.sort_keys();
295            headers
296        };
297        #[cfg(not(test))]
298        let headers = &parts.headers;
299
300        attrs.push(KeyValue::new(
301            HTTP_RESPONSE_HEADERS,
302            opentelemetry::Value::String(format!("{headers:?}").into()),
303        ));
304        attrs.push(KeyValue::new(
305            HTTP_RESPONSE_STATUS,
306            opentelemetry::Value::String(format!("{}", parts.status).into()),
307        ));
308        attrs.push(KeyValue::new(
309            HTTP_RESPONSE_VERSION,
310            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
311        ));
312        attrs.push(KeyValue::new(
313            HTTP_RESPONSE_BODY,
314            opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
315        ));
316
317        log_event(
318            level,
319            "connector.response",
320            attrs,
321            &format!("Response from connector {label:?}", label = connector.label),
322        );
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use std::sync::Arc;
329
330    use apollo_compiler::Schema;
331    use apollo_compiler::collections::IndexMap;
332    use apollo_compiler::name;
333    use apollo_compiler::response::JsonValue;
334    use apollo_federation::connectors::ConnectId;
335    use apollo_federation::connectors::ConnectSpec;
336    use apollo_federation::connectors::Connector;
337    use apollo_federation::connectors::ConnectorErrorsSettings;
338    use apollo_federation::connectors::EntityResolver;
339    use apollo_federation::connectors::HTTPMethod;
340    use apollo_federation::connectors::HttpJsonTransport;
341    use apollo_federation::connectors::JSONSelection;
342    use apollo_federation::connectors::Label;
343    use apollo_federation::connectors::Namespace;
344    use apollo_federation::connectors::runtime::inputs::RequestInputs;
345    use apollo_federation::connectors::runtime::key::ResponseKey;
346    use insta::assert_debug_snapshot;
347    use itertools::Itertools;
348    use serde_json_bytes::json;
349
350    use crate::Context;
351    use crate::graphql;
352    use crate::plugins::connectors::handle_responses::process_response;
353    use crate::services::router;
354    use crate::services::router::body::RouterBody;
355
356    #[tokio::test]
357    async fn test_handle_responses_root_fields() {
358        let connector = Arc::new(Connector {
359            spec: ConnectSpec::V0_1,
360            schema_subtypes_map: Default::default(),
361            id: ConnectId::new(
362                "subgraph_name".into(),
363                None,
364                name!(Query),
365                name!(hello),
366                None,
367                0,
368            ),
369            transport: HttpJsonTransport {
370                source_template: "http://localhost/api".parse().ok(),
371                connect_template: "/path".parse().unwrap(),
372                ..Default::default()
373            },
374            selection: JSONSelection::parse("$.data").unwrap(),
375            entity_resolver: None,
376            config: Default::default(),
377            max_requests: None,
378            batch_settings: None,
379            request_headers: Default::default(),
380            response_headers: Default::default(),
381            request_variable_keys: Default::default(),
382            response_variable_keys: Default::default(),
383            error_settings: Default::default(),
384            label: "test label".into(),
385        });
386
387        let response1: http::Response<RouterBody> = http::Response::builder()
388            .body(router::body::from_bytes(r#"{"data":"world"}"#))
389            .unwrap();
390        let response_key1 = ResponseKey::RootField {
391            name: "hello".to_string(),
392            inputs: Default::default(),
393            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
394        };
395
396        let response2 = http::Response::builder()
397            .body(router::body::from_bytes(r#"{"data":"world"}"#))
398            .unwrap();
399        let response_key2 = ResponseKey::RootField {
400            name: "hello2".to_string(),
401            inputs: Default::default(),
402            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
403        };
404
405        let supergraph_request = Arc::new(
406            http::Request::builder()
407                .body(graphql::Request::builder().build())
408                .unwrap(),
409        );
410
411        let res = super::aggregate_responses(
412            vec![
413                process_response(
414                    Ok(response1),
415                    response_key1,
416                    connector.clone(),
417                    &Context::default(),
418                    (None, Default::default()),
419                    None,
420                    supergraph_request.clone(),
421                    Default::default(),
422                )
423                .await
424                .mapped_response,
425                process_response(
426                    Ok(response2),
427                    response_key2,
428                    connector,
429                    &Context::default(),
430                    (None, Default::default()),
431                    None,
432                    supergraph_request,
433                    Default::default(),
434                )
435                .await
436                .mapped_response,
437            ],
438            Context::new(),
439        )
440        .unwrap();
441
442        assert_debug_snapshot!(res.response, @r#"
443        Response {
444            status: 200,
445            version: HTTP/1.1,
446            headers: {},
447            body: Response {
448                label: None,
449                data: Some(
450                    Object({
451                        "hello": String(
452                            "world",
453                        ),
454                        "hello2": String(
455                            "world",
456                        ),
457                    }),
458                ),
459                path: None,
460                errors: [],
461                extensions: {},
462                has_next: None,
463                subscribed: None,
464                created_at: None,
465                incremental: [],
466            },
467        }
468        "#);
469    }
470
471    #[tokio::test]
472    async fn test_handle_responses_entities() {
473        let connector = Arc::new(Connector {
474            spec: ConnectSpec::V0_1,
475            schema_subtypes_map: Default::default(),
476            id: ConnectId::new(
477                "subgraph_name".into(),
478                None,
479                name!(Query),
480                name!(user),
481                None,
482                0,
483            ),
484            transport: HttpJsonTransport {
485                source_template: "http://localhost/api".parse().ok(),
486                connect_template: "/path".parse().unwrap(),
487                ..Default::default()
488            },
489            selection: JSONSelection::parse("$.data { id }").unwrap(),
490            entity_resolver: Some(EntityResolver::Explicit),
491            config: Default::default(),
492            max_requests: None,
493            batch_settings: None,
494            request_headers: Default::default(),
495            response_headers: Default::default(),
496            request_variable_keys: Default::default(),
497            response_variable_keys: Default::default(),
498            error_settings: Default::default(),
499            label: "test label".into(),
500        });
501
502        let response1: http::Response<RouterBody> = http::Response::builder()
503            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
504            .unwrap();
505        let response_key1 = ResponseKey::Entity {
506            index: 0,
507            inputs: Default::default(),
508            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
509        };
510
511        let response2 = http::Response::builder()
512            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
513            .unwrap();
514        let response_key2 = ResponseKey::Entity {
515            index: 1,
516            inputs: Default::default(),
517            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
518        };
519
520        let supergraph_request = Arc::new(
521            http::Request::builder()
522                .body(graphql::Request::builder().build())
523                .unwrap(),
524        );
525
526        let res = super::aggregate_responses(
527            vec![
528                process_response(
529                    Ok(response1),
530                    response_key1,
531                    connector.clone(),
532                    &Context::default(),
533                    (None, Default::default()),
534                    None,
535                    supergraph_request.clone(),
536                    Default::default(),
537                )
538                .await
539                .mapped_response,
540                process_response(
541                    Ok(response2),
542                    response_key2,
543                    connector,
544                    &Context::default(),
545                    (None, Default::default()),
546                    None,
547                    supergraph_request,
548                    Default::default(),
549                )
550                .await
551                .mapped_response,
552            ],
553            Context::new(),
554        )
555        .unwrap();
556
557        assert_debug_snapshot!(res.response, @r#"
558        Response {
559            status: 200,
560            version: HTTP/1.1,
561            headers: {},
562            body: Response {
563                label: None,
564                data: Some(
565                    Object({
566                        "_entities": Array([
567                            Object({
568                                "id": String(
569                                    "1",
570                                ),
571                            }),
572                            Object({
573                                "id": String(
574                                    "2",
575                                ),
576                            }),
577                        ]),
578                    }),
579                ),
580                path: None,
581                errors: [],
582                extensions: {},
583                has_next: None,
584                subscribed: None,
585                created_at: None,
586                incremental: [],
587            },
588        }
589        "#);
590    }
591
592    #[tokio::test]
593    async fn test_handle_responses_batch() {
594        let connector = Arc::new(Connector {
595            spec: ConnectSpec::V0_2,
596            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
597            schema_subtypes_map: Default::default(),
598            transport: HttpJsonTransport {
599                source_template: "http://localhost/api".parse().ok(),
600                connect_template: "/path".parse().unwrap(),
601                method: HTTPMethod::Post,
602                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
603                ..Default::default()
604            },
605            selection: JSONSelection::parse("$.data { id name }").unwrap(),
606            entity_resolver: Some(EntityResolver::TypeBatch),
607            config: Default::default(),
608            max_requests: None,
609            batch_settings: None,
610            request_headers: Default::default(),
611            response_headers: Default::default(),
612            request_variable_keys: Default::default(),
613            response_variable_keys: Default::default(),
614            error_settings: Default::default(),
615            label: "test label".into(),
616        });
617
618        let keys = connector
619            .resolvable_key(
620                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
621                    .unwrap(),
622            )
623            .unwrap()
624            .unwrap();
625
626        let response1: http::Response<RouterBody> = http::Response::builder()
627            // different order from the request inputs
628            .body(router::body::from_bytes(
629                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
630            ))
631            .unwrap();
632
633        let mut inputs: RequestInputs = RequestInputs::default();
634        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
635        inputs.batch = representations
636            .as_array()
637            .unwrap()
638            .iter()
639            .cloned()
640            .map(|v| v.as_object().unwrap().clone())
641            .collect_vec();
642
643        let response_key1 = ResponseKey::BatchEntity {
644            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
645            keys,
646            inputs,
647        };
648
649        let supergraph_request = Arc::new(
650            http::Request::builder()
651                .body(graphql::Request::builder().build())
652                .unwrap(),
653        );
654
655        let res = super::aggregate_responses(
656            vec![
657                process_response(
658                    Ok(response1),
659                    response_key1,
660                    connector.clone(),
661                    &Context::default(),
662                    (None, Default::default()),
663                    None,
664                    supergraph_request,
665                    Default::default(),
666                )
667                .await
668                .mapped_response,
669            ],
670            Context::new(),
671        )
672        .unwrap();
673
674        assert_debug_snapshot!(res.response, @r#"
675        Response {
676            status: 200,
677            version: HTTP/1.1,
678            headers: {},
679            body: Response {
680                label: None,
681                data: Some(
682                    Object({
683                        "_entities": Array([
684                            Object({
685                                "id": String(
686                                    "1",
687                                ),
688                                "name": String(
689                                    "A",
690                                ),
691                            }),
692                            Object({
693                                "id": String(
694                                    "2",
695                                ),
696                                "name": String(
697                                    "B",
698                                ),
699                            }),
700                        ]),
701                    }),
702                ),
703                path: None,
704                errors: [],
705                extensions: {},
706                has_next: None,
707                subscribed: None,
708                created_at: None,
709                incremental: [],
710            },
711        }
712        "#);
713    }
714
715    #[tokio::test]
716    async fn test_handle_responses_entity_field() {
717        let connector = Arc::new(Connector {
718            spec: ConnectSpec::V0_1,
719            schema_subtypes_map: Default::default(),
720            id: ConnectId::new(
721                "subgraph_name".into(),
722                None,
723                name!(User),
724                name!(field),
725                None,
726                0,
727            ),
728            transport: HttpJsonTransport {
729                source_template: "http://localhost/api".parse().ok(),
730                connect_template: "/path".parse().unwrap(),
731                ..Default::default()
732            },
733            selection: JSONSelection::parse("$.data").unwrap(),
734            entity_resolver: Some(EntityResolver::Implicit),
735            config: Default::default(),
736            max_requests: None,
737            batch_settings: None,
738            request_headers: Default::default(),
739            response_headers: Default::default(),
740            request_variable_keys: Default::default(),
741            response_variable_keys: Default::default(),
742            error_settings: Default::default(),
743            label: "test label".into(),
744        });
745
746        let response1: http::Response<RouterBody> = http::Response::builder()
747            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
748            .unwrap();
749        let response_key1 = ResponseKey::EntityField {
750            index: 0,
751            inputs: Default::default(),
752            field_name: "field".to_string(),
753            typename: Some(name!("User")),
754            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
755        };
756
757        let response2 = http::Response::builder()
758            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
759            .unwrap();
760        let response_key2 = ResponseKey::EntityField {
761            index: 1,
762            inputs: Default::default(),
763            field_name: "field".to_string(),
764            typename: Some(name!("User")),
765            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
766        };
767
768        let supergraph_request = Arc::new(
769            http::Request::builder()
770                .body(graphql::Request::builder().build())
771                .unwrap(),
772        );
773
774        let res = super::aggregate_responses(
775            vec![
776                process_response(
777                    Ok(response1),
778                    response_key1,
779                    connector.clone(),
780                    &Context::default(),
781                    (None, Default::default()),
782                    None,
783                    supergraph_request.clone(),
784                    Default::default(),
785                )
786                .await
787                .mapped_response,
788                process_response(
789                    Ok(response2),
790                    response_key2,
791                    connector,
792                    &Context::default(),
793                    (None, Default::default()),
794                    None,
795                    supergraph_request,
796                    Default::default(),
797                )
798                .await
799                .mapped_response,
800            ],
801            Context::new(),
802        )
803        .unwrap();
804
805        assert_debug_snapshot!(res.response, @r#"
806        Response {
807            status: 200,
808            version: HTTP/1.1,
809            headers: {},
810            body: Response {
811                label: None,
812                data: Some(
813                    Object({
814                        "_entities": Array([
815                            Object({
816                                "__typename": String(
817                                    "User",
818                                ),
819                                "field": String(
820                                    "value1",
821                                ),
822                            }),
823                            Object({
824                                "__typename": String(
825                                    "User",
826                                ),
827                                "field": String(
828                                    "value2",
829                                ),
830                            }),
831                        ]),
832                    }),
833                ),
834                path: None,
835                errors: [],
836                extensions: {},
837                has_next: None,
838                subscribed: None,
839                created_at: None,
840                incremental: [],
841            },
842        }
843        "#);
844    }
845
846    #[tokio::test]
847    async fn test_handle_responses_errors() {
848        let connector = Arc::new(Connector {
849            spec: ConnectSpec::V0_1,
850            schema_subtypes_map: Default::default(),
851            id: ConnectId::new(
852                "subgraph_name".into(),
853                None,
854                name!(Query),
855                name!(user),
856                None,
857                0,
858            ),
859            transport: HttpJsonTransport {
860                source_template: "http://localhost/api".parse().ok(),
861                connect_template: "/path".parse().unwrap(),
862                ..Default::default()
863            },
864            selection: JSONSelection::parse("$.data").unwrap(),
865            entity_resolver: Some(EntityResolver::Explicit),
866            config: Default::default(),
867            max_requests: None,
868            batch_settings: None,
869            request_headers: Default::default(),
870            response_headers: Default::default(),
871            request_variable_keys: Default::default(),
872            response_variable_keys: Default::default(),
873            error_settings: Default::default(),
874            label: "test label".into(),
875        });
876
877        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
878            .body(router::body::from_bytes(r#"plain text"#))
879            .unwrap();
880        let response_key_plaintext = ResponseKey::Entity {
881            index: 0,
882            inputs: Default::default(),
883            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
884        };
885
886        let response1: http::Response<RouterBody> = http::Response::builder()
887            .status(404)
888            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
889            .unwrap();
890        let response_key1 = ResponseKey::Entity {
891            index: 1,
892            inputs: Default::default(),
893            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
894        };
895
896        let response2 = http::Response::builder()
897            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
898            .unwrap();
899        let response_key2 = ResponseKey::Entity {
900            index: 2,
901            inputs: Default::default(),
902            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
903        };
904
905        let response3: http::Response<RouterBody> = http::Response::builder()
906            .status(500)
907            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
908            .unwrap();
909        let response_key3 = ResponseKey::Entity {
910            index: 3,
911            inputs: Default::default(),
912            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
913        };
914
915        let supergraph_request = Arc::new(
916            http::Request::builder()
917                .body(graphql::Request::builder().build())
918                .unwrap(),
919        );
920
921        let mut res = super::aggregate_responses(
922            vec![
923                process_response(
924                    Ok(response_plaintext),
925                    response_key_plaintext,
926                    connector.clone(),
927                    &Context::default(),
928                    (None, Default::default()),
929                    None,
930                    supergraph_request.clone(),
931                    Default::default(),
932                )
933                .await
934                .mapped_response,
935                process_response(
936                    Ok(response1),
937                    response_key1,
938                    connector.clone(),
939                    &Context::default(),
940                    (None, Default::default()),
941                    None,
942                    supergraph_request.clone(),
943                    Default::default(),
944                )
945                .await
946                .mapped_response,
947                process_response(
948                    Ok(response2),
949                    response_key2,
950                    connector.clone(),
951                    &Context::default(),
952                    (None, Default::default()),
953                    None,
954                    supergraph_request.clone(),
955                    Default::default(),
956                )
957                .await
958                .mapped_response,
959                process_response(
960                    Ok(response3),
961                    response_key3,
962                    connector,
963                    &Context::default(),
964                    (None, Default::default()),
965                    None,
966                    supergraph_request,
967                    Default::default(),
968                )
969                .await
970                .mapped_response,
971            ],
972            Context::new(),
973        )
974        .unwrap();
975
976        // Overwrite error IDs to avoid random Uuid mismatch.
977        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
978        // we have to do it manually.
979        let body = res.response.body_mut();
980        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
981
982        assert_debug_snapshot!(res.response, @r#"
983        Response {
984            status: 200,
985            version: HTTP/1.1,
986            headers: {},
987            body: Response {
988                label: None,
989                data: Some(
990                    Object({
991                        "_entities": Array([
992                            Null,
993                            Null,
994                            Object({
995                                "id": String(
996                                    "2",
997                                ),
998                            }),
999                            Null,
1000                        ]),
1001                    }),
1002                ),
1003                path: None,
1004                errors: [
1005                    Error {
1006                        message: "The server returned data in an unexpected format.",
1007                        locations: [],
1008                        path: Some(
1009                            Path(
1010                                [
1011                                    Key(
1012                                        "_entities",
1013                                        None,
1014                                    ),
1015                                    Index(
1016                                        0,
1017                                    ),
1018                                ],
1019                            ),
1020                        ),
1021                        extensions: {
1022                            "code": String(
1023                                "CONNECTOR_RESPONSE_INVALID",
1024                            ),
1025                            "service": String(
1026                                "subgraph_name",
1027                            ),
1028                            "connector": Object({
1029                                "coordinate": String(
1030                                    "subgraph_name:Query.user[0]",
1031                                ),
1032                            }),
1033                            "http": Object({
1034                                "status": Number(200),
1035                            }),
1036                            "apollo.private.subgraph.name": String(
1037                                "subgraph_name",
1038                            ),
1039                        },
1040                        apollo_id: 00000000-0000-0000-0000-000000000000,
1041                    },
1042                    Error {
1043                        message: "Request failed",
1044                        locations: [],
1045                        path: Some(
1046                            Path(
1047                                [
1048                                    Key(
1049                                        "_entities",
1050                                        None,
1051                                    ),
1052                                    Index(
1053                                        1,
1054                                    ),
1055                                ],
1056                            ),
1057                        ),
1058                        extensions: {
1059                            "code": String(
1060                                "CONNECTOR_FETCH",
1061                            ),
1062                            "service": String(
1063                                "subgraph_name",
1064                            ),
1065                            "connector": Object({
1066                                "coordinate": String(
1067                                    "subgraph_name:Query.user[0]",
1068                                ),
1069                            }),
1070                            "http": Object({
1071                                "status": Number(404),
1072                            }),
1073                            "apollo.private.subgraph.name": String(
1074                                "subgraph_name",
1075                            ),
1076                        },
1077                        apollo_id: 00000000-0000-0000-0000-000000000000,
1078                    },
1079                    Error {
1080                        message: "Request failed",
1081                        locations: [],
1082                        path: Some(
1083                            Path(
1084                                [
1085                                    Key(
1086                                        "_entities",
1087                                        None,
1088                                    ),
1089                                    Index(
1090                                        3,
1091                                    ),
1092                                ],
1093                            ),
1094                        ),
1095                        extensions: {
1096                            "code": String(
1097                                "CONNECTOR_FETCH",
1098                            ),
1099                            "service": String(
1100                                "subgraph_name",
1101                            ),
1102                            "connector": Object({
1103                                "coordinate": String(
1104                                    "subgraph_name:Query.user[0]",
1105                                ),
1106                            }),
1107                            "http": Object({
1108                                "status": Number(500),
1109                            }),
1110                            "apollo.private.subgraph.name": String(
1111                                "subgraph_name",
1112                            ),
1113                        },
1114                        apollo_id: 00000000-0000-0000-0000-000000000000,
1115                    },
1116                ],
1117                extensions: {},
1118                has_next: None,
1119                subscribed: None,
1120                created_at: None,
1121                incremental: [],
1122            },
1123        }
1124        "#);
1125    }
1126
1127    #[tokio::test]
1128    async fn test_handle_responses_status() {
1129        let selection = JSONSelection::parse("$status").unwrap();
1130        let connector = Arc::new(Connector {
1131            spec: ConnectSpec::V0_1,
1132            schema_subtypes_map: Default::default(),
1133            id: ConnectId::new(
1134                "subgraph_name".into(),
1135                None,
1136                name!(Query),
1137                name!(hello),
1138                None,
1139                0,
1140            ),
1141            transport: HttpJsonTransport {
1142                source_template: "http://localhost/api".parse().ok(),
1143                connect_template: "/path".parse().unwrap(),
1144                ..Default::default()
1145            },
1146            selection: selection.clone(),
1147            entity_resolver: None,
1148            config: Default::default(),
1149            max_requests: None,
1150            batch_settings: None,
1151            request_headers: Default::default(),
1152            response_headers: Default::default(),
1153            request_variable_keys: Default::default(),
1154            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1155            error_settings: Default::default(),
1156            label: "test label".into(),
1157        });
1158
1159        let response1: http::Response<RouterBody> = http::Response::builder()
1160            .status(201)
1161            .body(router::body::from_bytes(r#"{}"#))
1162            .unwrap();
1163        let response_key1 = ResponseKey::RootField {
1164            name: "hello".to_string(),
1165            inputs: Default::default(),
1166            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1167        };
1168
1169        let supergraph_request = Arc::new(
1170            http::Request::builder()
1171                .body(graphql::Request::builder().build())
1172                .unwrap(),
1173        );
1174
1175        let res = super::aggregate_responses(
1176            vec![
1177                process_response(
1178                    Ok(response1),
1179                    response_key1,
1180                    connector,
1181                    &Context::default(),
1182                    (None, Default::default()),
1183                    None,
1184                    supergraph_request,
1185                    Default::default(),
1186                )
1187                .await
1188                .mapped_response,
1189            ],
1190            Context::new(),
1191        )
1192        .unwrap();
1193
1194        assert_debug_snapshot!(res.response, @r#"
1195        Response {
1196            status: 200,
1197            version: HTTP/1.1,
1198            headers: {},
1199            body: Response {
1200                label: None,
1201                data: Some(
1202                    Object({
1203                        "hello": Number(201),
1204                    }),
1205                ),
1206                path: None,
1207                errors: [],
1208                extensions: {},
1209                has_next: None,
1210                subscribed: None,
1211                created_at: None,
1212                incremental: [],
1213            },
1214        }
1215        "#);
1216    }
1217
1218    #[tokio::test]
1219    async fn test_handle_response_with_is_success() {
1220        let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1221        let selection = JSONSelection::parse("$status").unwrap();
1222        let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1223            message: Default::default(),
1224            source_extensions: Default::default(),
1225            connect_extensions: Default::default(),
1226            connect_is_success: Some(is_success.clone()),
1227        };
1228        let connector = Arc::new(Connector {
1229            spec: ConnectSpec::V0_1,
1230            schema_subtypes_map: Default::default(),
1231            id: ConnectId::new(
1232                "subgraph_name".into(),
1233                None,
1234                name!(Query),
1235                name!(hello),
1236                None,
1237                0,
1238            ),
1239            transport: HttpJsonTransport {
1240                source_template: "http://localhost/api".parse().ok(),
1241                connect_template: "/path".parse().unwrap(),
1242                ..Default::default()
1243            },
1244            selection: selection.clone(),
1245            entity_resolver: None,
1246            config: Default::default(),
1247            max_requests: None,
1248            batch_settings: None,
1249            request_headers: Default::default(),
1250            response_headers: Default::default(),
1251            request_variable_keys: Default::default(),
1252            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1253            error_settings,
1254            label: Label::from("test label"),
1255        });
1256
1257        // First request should be marked as error as status is NOT 400
1258        let response_fail: http::Response<RouterBody> = http::Response::builder()
1259            .status(201)
1260            .body(router::body::from_bytes(r#"{}"#))
1261            .unwrap();
1262        let response_fail_key = ResponseKey::RootField {
1263            name: "hello".to_string(),
1264            inputs: Default::default(),
1265            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1266        };
1267
1268        // Second response should be marked as a success as the status is 400!
1269        let response_succeed: http::Response<RouterBody> = http::Response::builder()
1270            .status(400)
1271            .body(router::body::from_bytes(r#"{}"#))
1272            .unwrap();
1273        let response_succeed_key = ResponseKey::RootField {
1274            name: "hello".to_string(),
1275            inputs: Default::default(),
1276            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1277        };
1278
1279        let supergraph_request = Arc::new(
1280            http::Request::builder()
1281                .body(graphql::Request::builder().build())
1282                .unwrap(),
1283        );
1284
1285        // Make failing request
1286        let res_expect_fail = super::aggregate_responses(
1287            vec![
1288                process_response(
1289                    Ok(response_fail),
1290                    response_fail_key,
1291                    connector.clone(),
1292                    &Context::default(),
1293                    (None, Default::default()),
1294                    None,
1295                    supergraph_request.clone(),
1296                    Default::default(),
1297                )
1298                .await
1299                .mapped_response,
1300            ],
1301            Context::new(),
1302        )
1303        .unwrap()
1304        .response;
1305        assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1306        assert_eq!(res_expect_fail.body().errors.len(), 1);
1307
1308        // Make succeeding request
1309        let res_expect_success = super::aggregate_responses(
1310            vec![
1311                process_response(
1312                    Ok(response_succeed),
1313                    response_succeed_key,
1314                    connector.clone(),
1315                    &Context::default(),
1316                    (None, Default::default()),
1317                    None,
1318                    supergraph_request.clone(),
1319                    Default::default(),
1320                )
1321                .await
1322                .mapped_response,
1323            ],
1324            Context::new(),
1325        )
1326        .unwrap()
1327        .response;
1328        assert!(res_expect_success.body().errors.is_empty());
1329        assert_eq!(
1330            &res_expect_success.body().data,
1331            &Some(json!({"hello": json!(400)}))
1332        );
1333    }
1334}