Skip to main content

apollo_router/plugins/connectors/
handle_responses.rs

1use std::sync::Arc;
2
3use apollo_compiler::ExecutableDocument;
4use apollo_compiler::validation::Valid;
5use apollo_federation::connectors::Connector;
6use apollo_federation::connectors::runtime::debug::ConnectorContext;
7use apollo_federation::connectors::runtime::debug::DebugRequest;
8use apollo_federation::connectors::runtime::debug::SelectionData;
9use apollo_federation::connectors::runtime::errors::Error;
10use apollo_federation::connectors::runtime::errors::RuntimeError;
11use apollo_federation::connectors::runtime::http_json_transport::HttpResponse;
12use apollo_federation::connectors::runtime::http_json_transport::TransportResponse;
13use apollo_federation::connectors::runtime::key::ResponseKey;
14use apollo_federation::connectors::runtime::mapping::Problem;
15use apollo_federation::connectors::runtime::responses::HandleResponseError;
16use apollo_federation::connectors::runtime::responses::MappedResponse;
17use apollo_federation::connectors::runtime::responses::deserialize_response;
18use apollo_federation::connectors::runtime::responses::handle_raw_response;
19use axum::body::HttpBody;
20use http::response::Parts;
21use http_body_util::BodyExt;
22use http_body_util::LengthLimitError;
23use http_body_util::Limited;
24use opentelemetry::KeyValue;
25use parking_lot::Mutex;
26use serde_json_bytes::Map;
27use serde_json_bytes::Value;
28use tracing::Span;
29
30use crate::Context;
31use crate::graphql;
32use crate::json_ext::Path;
33use crate::plugins::limits::ConnectorResponseSizeLimit;
34use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_BODY;
35use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_HEADERS;
36use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_STATUS;
37use crate::plugins::telemetry::config_new::attributes::HTTP_RESPONSE_VERSION;
38use crate::plugins::telemetry::config_new::connector::events::ConnectorEventResponse;
39use crate::plugins::telemetry::config_new::events::log_event;
40use crate::plugins::telemetry::consts::OTEL_STATUS_CODE;
41use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_ERROR;
42use crate::plugins::telemetry::consts::OTEL_STATUS_CODE_OK;
43use crate::plugins::telemetry::tracing::apollo_telemetry::emit_error_event;
44use crate::services::connect::Response;
45use crate::services::connector;
46use crate::services::fetch::AddSubgraphNameExt;
47
48// --- ERRORS ------------------------------------------------------------------
49
50impl From<RuntimeError> for graphql::Error {
51    fn from(error: RuntimeError) -> Self {
52        let path: Path = (&error.path).into();
53
54        let err = graphql::Error::builder()
55            .message(&error.message)
56            .extensions(error.extensions())
57            .extension_code(error.code())
58            .path(path)
59            .build();
60
61        if let Some(subgraph_name) = &error.subgraph_name {
62            err.with_subgraph_name(subgraph_name)
63        } else {
64            err
65        }
66    }
67}
68
69// --- handle_responses --------------------------------------------------------
70
71#[allow(clippy::too_many_arguments)]
72pub(crate) async fn process_response<T>(
73    result: Result<http::Response<T>, Error>,
74    response_key: ResponseKey,
75    connector: Arc<Connector>,
76    context: &Context,
77    debug_request: DebugRequest,
78    debug_context: Option<&Arc<Mutex<ConnectorContext>>>,
79    supergraph_request: Arc<http::Request<crate::graphql::Request>>,
80    operation: Option<Arc<Valid<ExecutableDocument>>>,
81) -> connector::request_service::Response
82where
83    T: HttpBody,
84    T::Error: Into<tower::BoxError>,
85{
86    let (mapped_response, result) = match result {
87        // This occurs when we short-circuit the request when over the limit
88        Err(error) => {
89            Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
90            (
91                MappedResponse::Error {
92                    error: error.to_runtime_error(&connector, &response_key),
93                    key: response_key,
94                    problems: Vec::new(),
95                },
96                Err(error),
97            )
98        }
99        Ok(response) => {
100            let (parts, body) = response.into_parts();
101
102            let result = Ok(TransportResponse::Http(HttpResponse {
103                inner: parts.clone(),
104            }));
105
106            let make_err = |message: String, code: &str| -> Box<RuntimeError> {
107                let mut err = RuntimeError::new(message, &response_key);
108                err.subgraph_name = Some(connector.id.subgraph_name.clone());
109                err = err.with_code(code);
110                err.coordinate = Some(connector.id.coordinate());
111                err = err.extension(
112                    "http",
113                    Value::Object(Map::from_iter([(
114                        "status".into(),
115                        Value::Number(parts.status.as_u16().into()),
116                    )])),
117                );
118                Box::new(err)
119            };
120
121            let make_invalid_response_err = || {
122                make_err(
123                    "The server returned data in an unexpected format.".to_string(),
124                    "CONNECTOR_RESPONSE_INVALID",
125                )
126            };
127
128            let make_limit_err = |limit: usize| {
129                make_err(
130                    format!("connector response body exceeded limit of {limit} bytes"),
131                    "CONNECTOR_RESPONSE_SIZE_LIMIT_EXCEEDED",
132                )
133            };
134
135            let response_size_limit = context
136                .extensions()
137                .with_lock(|e| e.get::<ConnectorResponseSizeLimit>().copied());
138
139            let body_result: Result<_, Box<RuntimeError>> = match response_size_limit {
140                Some(ConnectorResponseSizeLimit(limit)) => {
141                    Limited::new(body, limit)
142                        .collect()
143                        .await
144                        .map_err(|e| {
145                            if e.downcast_ref::<LengthLimitError>().is_some() {
146                                u64_counter!(
147                                    "apollo.router.limits.connector_response_size.exceeded",
148                                    "Number of connector responses aborted because they exceeded the configured response size limit",
149                                    1,
150                                    "connector.source" = connector.source_config_key()
151                                );
152                                tracing::Span::current()
153                                    .record("apollo.connector.response.aborted", "response_size_limit");
154                                make_limit_err(limit)
155                            } else {
156                                make_invalid_response_err()
157                            }
158                        })
159                }
160                None => body
161                    .collect()
162                    .await
163                    .map_err(|_| make_invalid_response_err()),
164            };
165
166            let deserialized_body = body_result.and_then(|body| {
167                let body = body.to_bytes();
168                let raw = deserialize_response(&body, &parts.headers).map_err(|_| {
169                    if let Some(debug_context) = debug_context {
170                        debug_context.lock().push_invalid_response(
171                            debug_request.0.clone(),
172                            &parts,
173                            &body,
174                            &connector.error_settings,
175                            debug_request.1.clone(),
176                        );
177                    }
178                    make_invalid_response_err()
179                });
180                log_connectors_event(context, &body, &parts, response_key.clone(), &connector);
181                raw
182            });
183
184            // If this errors, it will write to the debug context because it
185            // has access to the raw bytes, so we can't write to it again
186            // in any RawResponse::Error branches.
187            let mapped = match &deserialized_body {
188                Err(error) => MappedResponse::Error {
189                    error: error.as_ref().clone(),
190                    key: response_key,
191                    problems: Vec::new(),
192                },
193                Ok(data) => handle_raw_response(
194                    data,
195                    &parts,
196                    response_key,
197                    &connector,
198                    context,
199                    supergraph_request.headers(),
200                )
201                .apply_operation(
202                    operation
203                        .as_ref()
204                        .map(|arc_valid_doc| arc_valid_doc.as_ref().as_ref()),
205                    &connector.schema_subtypes_map,
206                ),
207            };
208
209            if let Some(debug) = debug_context {
210                let mut debug_problems: Vec<Problem> = mapped.problems().to_vec();
211                debug_problems.extend(debug_request.1);
212
213                let selection_data = if let MappedResponse::Data { key, data, .. } = &mapped {
214                    Some(SelectionData {
215                        source: connector.selection.to_string(),
216                        transformed: key.selection().to_string(),
217                        result: Some(data.clone()),
218                    })
219                } else {
220                    None
221                };
222
223                debug.lock().push_response(
224                    debug_request.0,
225                    &parts,
226                    deserialized_body.ok().as_ref().unwrap_or(&Value::Null),
227                    selection_data,
228                    &connector.error_settings,
229                    debug_problems,
230                );
231            }
232            if matches!(mapped, MappedResponse::Data { .. }) {
233                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_OK);
234            } else {
235                Span::current().record(OTEL_STATUS_CODE, OTEL_STATUS_CODE_ERROR);
236            }
237
238            (mapped, result)
239        }
240    };
241
242    if let MappedResponse::Error { ref error, .. } = mapped_response {
243        emit_error_event(error.code(), &error.message, Some((*error.path).into()));
244    }
245
246    connector::request_service::Response {
247        context: context.clone(),
248        transport_result: result,
249        mapped_response,
250    }
251}
252
253pub(crate) fn aggregate_responses(
254    responses: Vec<MappedResponse>,
255    _context: Context,
256) -> Result<Response, HandleResponseError> {
257    let mut data = serde_json_bytes::Map::new();
258    let mut errors = Vec::new();
259    let count = responses.len();
260
261    for mapped in responses {
262        mapped.add_to_data(&mut data, &mut errors, count)?;
263    }
264
265    let data = if data.is_empty() {
266        Value::Null
267    } else {
268        Value::Object(data)
269    };
270
271    Span::current().record(
272        OTEL_STATUS_CODE,
273        if errors.is_empty() {
274            OTEL_STATUS_CODE_OK
275        } else {
276            OTEL_STATUS_CODE_ERROR
277        },
278    );
279
280    Ok(Response {
281        response: http::Response::builder()
282            .body(
283                graphql::Response::builder()
284                    .data(data)
285                    .errors(errors.into_iter().map(|e| e.into()).collect())
286                    .build(),
287            )
288            .unwrap(),
289    })
290}
291
292fn log_connectors_event(
293    context: &Context,
294    body: &[u8],
295    parts: &Parts,
296    response_key: ResponseKey,
297    connector: &Connector,
298) {
299    let log_response_level = context
300        .extensions()
301        .with_lock(|lock| lock.get::<ConnectorEventResponse>().cloned())
302        .and_then(|event| {
303            // TODO: evaluate if this is still needed now that we're cloning the body anyway
304            // Create a temporary response here so we can evaluate the condition. This response
305            // is missing any information about the mapped response, because we don't have that
306            // yet. This means that we cannot correctly evaluate any condition that relies on
307            // the mapped response data or mapping problems. But we can't wait until we do have
308            // that information, because this is the only place we have the body bytes (without
309            // making an expensive clone of the body). So we either need to not expose any
310            // selector which can be used as a condition that requires mapping information, or
311            // we must document that such selectors cannot be used as conditions on standard
312            // connectors events.
313
314            let response = connector::request_service::Response {
315                context: context.clone(),
316                transport_result: Ok(TransportResponse::Http(HttpResponse {
317                    inner: parts.clone(),
318                })),
319                mapped_response: MappedResponse::Data {
320                    data: Value::Null,
321                    key: response_key,
322                    problems: vec![],
323                },
324            };
325            if event.condition.evaluate_response(&response) {
326                Some(event.level)
327            } else {
328                None
329            }
330        });
331
332    if let Some(level) = log_response_level {
333        let mut attrs = Vec::with_capacity(4);
334        #[cfg(test)]
335        let headers = {
336            let mut headers: indexmap::IndexMap<String, http::HeaderValue> = parts
337                .headers
338                .iter()
339                .map(|(name, val)| (name.to_string(), val.clone()))
340                .collect();
341            headers.sort_keys();
342            headers
343        };
344        #[cfg(not(test))]
345        let headers = &parts.headers;
346
347        attrs.push(KeyValue::new(
348            HTTP_RESPONSE_HEADERS,
349            opentelemetry::Value::String(format!("{headers:?}").into()),
350        ));
351        attrs.push(KeyValue::new(
352            HTTP_RESPONSE_STATUS,
353            opentelemetry::Value::String(format!("{}", parts.status).into()),
354        ));
355        attrs.push(KeyValue::new(
356            HTTP_RESPONSE_VERSION,
357            opentelemetry::Value::String(format!("{:?}", parts.version).into()),
358        ));
359        attrs.push(KeyValue::new(
360            HTTP_RESPONSE_BODY,
361            opentelemetry::Value::String(String::from_utf8_lossy(body).into_owned().into()),
362        ));
363
364        log_event(
365            level,
366            "connector.response",
367            attrs,
368            &format!("Response from connector {label:?}", label = connector.label),
369        );
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::sync::Arc;
376
377    use apollo_compiler::Schema;
378    use apollo_compiler::collections::IndexMap;
379    use apollo_compiler::name;
380    use apollo_compiler::response::JsonValue;
381    use apollo_federation::connectors::ConnectId;
382    use apollo_federation::connectors::ConnectSpec;
383    use apollo_federation::connectors::Connector;
384    use apollo_federation::connectors::ConnectorErrorsSettings;
385    use apollo_federation::connectors::EntityResolver;
386    use apollo_federation::connectors::HTTPMethod;
387    use apollo_federation::connectors::HttpJsonTransport;
388    use apollo_federation::connectors::JSONSelection;
389    use apollo_federation::connectors::Label;
390    use apollo_federation::connectors::Namespace;
391    use apollo_federation::connectors::runtime::inputs::RequestInputs;
392    use apollo_federation::connectors::runtime::key::ResponseKey;
393    use insta::assert_debug_snapshot;
394    use itertools::Itertools;
395    use serde_json_bytes::json;
396
397    use crate::Context;
398    use crate::graphql;
399    use crate::plugins::connectors::handle_responses::process_response;
400    use crate::services::router;
401    use crate::services::router::body::RouterBody;
402
403    #[tokio::test]
404    async fn test_handle_responses_root_fields() {
405        let connector = Arc::new(Connector {
406            spec: ConnectSpec::V0_1,
407            schema_subtypes_map: Default::default(),
408            id: ConnectId::new(
409                "subgraph_name".into(),
410                None,
411                name!(Query),
412                name!(hello),
413                None,
414                0,
415            ),
416            transport: Some(HttpJsonTransport {
417                source_template: "http://localhost/api".parse().ok(),
418                connect_template: "/path".parse().unwrap(),
419                ..Default::default()
420            }),
421            selection: JSONSelection::parse("$.data").unwrap(),
422            entity_resolver: None,
423            config: Default::default(),
424            max_requests: None,
425            batch_settings: None,
426            request_headers: Default::default(),
427            response_headers: Default::default(),
428            request_variable_keys: Default::default(),
429            response_variable_keys: Default::default(),
430            error_settings: Default::default(),
431            label: "test label".into(),
432        });
433
434        let response1: http::Response<RouterBody> = http::Response::builder()
435            .body(router::body::from_bytes(r#"{"data":"world"}"#))
436            .unwrap();
437        let response_key1 = ResponseKey::RootField {
438            name: "hello".to_string(),
439            inputs: Default::default(),
440            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
441        };
442
443        let response2 = http::Response::builder()
444            .body(router::body::from_bytes(r#"{"data":"world"}"#))
445            .unwrap();
446        let response_key2 = ResponseKey::RootField {
447            name: "hello2".to_string(),
448            inputs: Default::default(),
449            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
450        };
451
452        let supergraph_request = Arc::new(
453            http::Request::builder()
454                .body(graphql::Request::builder().build())
455                .unwrap(),
456        );
457
458        let res = super::aggregate_responses(
459            vec![
460                process_response(
461                    Ok(response1),
462                    response_key1,
463                    connector.clone(),
464                    &Context::default(),
465                    (None, Default::default()),
466                    None,
467                    supergraph_request.clone(),
468                    Default::default(),
469                )
470                .await
471                .mapped_response,
472                process_response(
473                    Ok(response2),
474                    response_key2,
475                    connector,
476                    &Context::default(),
477                    (None, Default::default()),
478                    None,
479                    supergraph_request,
480                    Default::default(),
481                )
482                .await
483                .mapped_response,
484            ],
485            Context::new(),
486        )
487        .unwrap();
488
489        assert_debug_snapshot!(res.response, @r#"
490        Response {
491            status: 200,
492            version: HTTP/1.1,
493            headers: {},
494            body: Response {
495                label: None,
496                data: Some(
497                    Object({
498                        "hello": String(
499                            "world",
500                        ),
501                        "hello2": String(
502                            "world",
503                        ),
504                    }),
505                ),
506                path: None,
507                errors: [],
508                extensions: {},
509                has_next: None,
510                subscribed: None,
511                created_at: None,
512                incremental: [],
513            },
514        }
515        "#);
516    }
517
518    #[tokio::test]
519    async fn test_handle_responses_entities() {
520        let connector = Arc::new(Connector {
521            spec: ConnectSpec::V0_1,
522            schema_subtypes_map: Default::default(),
523            id: ConnectId::new(
524                "subgraph_name".into(),
525                None,
526                name!(Query),
527                name!(user),
528                None,
529                0,
530            ),
531            transport: Some(HttpJsonTransport {
532                source_template: "http://localhost/api".parse().ok(),
533                connect_template: "/path".parse().unwrap(),
534                ..Default::default()
535            }),
536            selection: JSONSelection::parse("$.data { id }").unwrap(),
537            entity_resolver: Some(EntityResolver::Explicit),
538            config: Default::default(),
539            max_requests: None,
540            batch_settings: None,
541            request_headers: Default::default(),
542            response_headers: Default::default(),
543            request_variable_keys: Default::default(),
544            response_variable_keys: Default::default(),
545            error_settings: Default::default(),
546            label: "test label".into(),
547        });
548
549        let response1: http::Response<RouterBody> = http::Response::builder()
550            .body(router::body::from_bytes(r#"{"data":{"id": "1"}}"#))
551            .unwrap();
552        let response_key1 = ResponseKey::Entity {
553            index: 0,
554            inputs: Default::default(),
555            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
556        };
557
558        let response2 = http::Response::builder()
559            .body(router::body::from_bytes(r#"{"data":{"id": "2"}}"#))
560            .unwrap();
561        let response_key2 = ResponseKey::Entity {
562            index: 1,
563            inputs: Default::default(),
564            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
565        };
566
567        let supergraph_request = Arc::new(
568            http::Request::builder()
569                .body(graphql::Request::builder().build())
570                .unwrap(),
571        );
572
573        let res = super::aggregate_responses(
574            vec![
575                process_response(
576                    Ok(response1),
577                    response_key1,
578                    connector.clone(),
579                    &Context::default(),
580                    (None, Default::default()),
581                    None,
582                    supergraph_request.clone(),
583                    Default::default(),
584                )
585                .await
586                .mapped_response,
587                process_response(
588                    Ok(response2),
589                    response_key2,
590                    connector,
591                    &Context::default(),
592                    (None, Default::default()),
593                    None,
594                    supergraph_request,
595                    Default::default(),
596                )
597                .await
598                .mapped_response,
599            ],
600            Context::new(),
601        )
602        .unwrap();
603
604        assert_debug_snapshot!(res.response, @r#"
605        Response {
606            status: 200,
607            version: HTTP/1.1,
608            headers: {},
609            body: Response {
610                label: None,
611                data: Some(
612                    Object({
613                        "_entities": Array([
614                            Object({
615                                "id": String(
616                                    "1",
617                                ),
618                            }),
619                            Object({
620                                "id": String(
621                                    "2",
622                                ),
623                            }),
624                        ]),
625                    }),
626                ),
627                path: None,
628                errors: [],
629                extensions: {},
630                has_next: None,
631                subscribed: None,
632                created_at: None,
633                incremental: [],
634            },
635        }
636        "#);
637    }
638
639    #[tokio::test]
640    async fn test_handle_responses_batch() {
641        let connector = Arc::new(Connector {
642            spec: ConnectSpec::V0_2,
643            id: ConnectId::new_on_object("subgraph_name".into(), None, name!(User), None, 0),
644            schema_subtypes_map: Default::default(),
645            transport: Some(HttpJsonTransport {
646                source_template: "http://localhost/api".parse().ok(),
647                connect_template: "/path".parse().unwrap(),
648                method: HTTPMethod::Post,
649                body: Some(JSONSelection::parse("ids: $batch.id").unwrap()),
650                ..Default::default()
651            }),
652            selection: JSONSelection::parse("$.data { id name }").unwrap(),
653            entity_resolver: Some(EntityResolver::TypeBatch),
654            config: Default::default(),
655            max_requests: None,
656            batch_settings: None,
657            request_headers: Default::default(),
658            response_headers: Default::default(),
659            request_variable_keys: Default::default(),
660            response_variable_keys: Default::default(),
661            error_settings: Default::default(),
662            label: "test label".into(),
663        });
664
665        let keys = connector
666            .resolvable_key(
667                &Schema::parse_and_validate("type Query { _: ID } type User { id: ID! }", "")
668                    .unwrap(),
669            )
670            .unwrap()
671            .unwrap();
672
673        let response1: http::Response<RouterBody> = http::Response::builder()
674            // different order from the request inputs
675            .body(router::body::from_bytes(
676                r#"{"data":[{"id": "2","name":"B"},{"id": "1","name":"A"}]}"#,
677            ))
678            .unwrap();
679
680        let mut inputs: RequestInputs = RequestInputs::default();
681        let representations = serde_json_bytes::json!([{"__typename": "User", "id": "1"}, {"__typename": "User", "id": "2"}]);
682        inputs.batch = representations
683            .as_array()
684            .unwrap()
685            .iter()
686            .map(|v| v.as_object().unwrap().clone())
687            .collect_vec();
688
689        let response_key1 = ResponseKey::BatchEntity {
690            selection: Arc::new(JSONSelection::parse("$.data { id name }").unwrap()),
691            keys,
692            inputs,
693        };
694
695        let supergraph_request = Arc::new(
696            http::Request::builder()
697                .body(graphql::Request::builder().build())
698                .unwrap(),
699        );
700
701        let res = super::aggregate_responses(
702            vec![
703                process_response(
704                    Ok(response1),
705                    response_key1,
706                    connector.clone(),
707                    &Context::default(),
708                    (None, Default::default()),
709                    None,
710                    supergraph_request,
711                    Default::default(),
712                )
713                .await
714                .mapped_response,
715            ],
716            Context::new(),
717        )
718        .unwrap();
719
720        assert_debug_snapshot!(res.response, @r#"
721        Response {
722            status: 200,
723            version: HTTP/1.1,
724            headers: {},
725            body: Response {
726                label: None,
727                data: Some(
728                    Object({
729                        "_entities": Array([
730                            Object({
731                                "id": String(
732                                    "1",
733                                ),
734                                "name": String(
735                                    "A",
736                                ),
737                            }),
738                            Object({
739                                "id": String(
740                                    "2",
741                                ),
742                                "name": String(
743                                    "B",
744                                ),
745                            }),
746                        ]),
747                    }),
748                ),
749                path: None,
750                errors: [],
751                extensions: {},
752                has_next: None,
753                subscribed: None,
754                created_at: None,
755                incremental: [],
756            },
757        }
758        "#);
759    }
760
761    #[tokio::test]
762    async fn test_handle_responses_entity_field() {
763        let connector = Arc::new(Connector {
764            spec: ConnectSpec::V0_1,
765            schema_subtypes_map: Default::default(),
766            id: ConnectId::new(
767                "subgraph_name".into(),
768                None,
769                name!(User),
770                name!(field),
771                None,
772                0,
773            ),
774            transport: Some(HttpJsonTransport {
775                source_template: "http://localhost/api".parse().ok(),
776                connect_template: "/path".parse().unwrap(),
777                ..Default::default()
778            }),
779            selection: JSONSelection::parse("$.data").unwrap(),
780            entity_resolver: Some(EntityResolver::Implicit),
781            config: Default::default(),
782            max_requests: None,
783            batch_settings: None,
784            request_headers: Default::default(),
785            response_headers: Default::default(),
786            request_variable_keys: Default::default(),
787            response_variable_keys: Default::default(),
788            error_settings: Default::default(),
789            label: "test label".into(),
790        });
791
792        let response1: http::Response<RouterBody> = http::Response::builder()
793            .body(router::body::from_bytes(r#"{"data":"value1"}"#))
794            .unwrap();
795        let response_key1 = ResponseKey::EntityField {
796            index: 0,
797            inputs: Default::default(),
798            field_name: "field".to_string(),
799            typename: Some(name!("User")),
800            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
801        };
802
803        let response2 = http::Response::builder()
804            .body(router::body::from_bytes(r#"{"data":"value2"}"#))
805            .unwrap();
806        let response_key2 = ResponseKey::EntityField {
807            index: 1,
808            inputs: Default::default(),
809            field_name: "field".to_string(),
810            typename: Some(name!("User")),
811            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
812        };
813
814        let supergraph_request = Arc::new(
815            http::Request::builder()
816                .body(graphql::Request::builder().build())
817                .unwrap(),
818        );
819
820        let res = super::aggregate_responses(
821            vec![
822                process_response(
823                    Ok(response1),
824                    response_key1,
825                    connector.clone(),
826                    &Context::default(),
827                    (None, Default::default()),
828                    None,
829                    supergraph_request.clone(),
830                    Default::default(),
831                )
832                .await
833                .mapped_response,
834                process_response(
835                    Ok(response2),
836                    response_key2,
837                    connector,
838                    &Context::default(),
839                    (None, Default::default()),
840                    None,
841                    supergraph_request,
842                    Default::default(),
843                )
844                .await
845                .mapped_response,
846            ],
847            Context::new(),
848        )
849        .unwrap();
850
851        assert_debug_snapshot!(res.response, @r#"
852        Response {
853            status: 200,
854            version: HTTP/1.1,
855            headers: {},
856            body: Response {
857                label: None,
858                data: Some(
859                    Object({
860                        "_entities": Array([
861                            Object({
862                                "__typename": String(
863                                    "User",
864                                ),
865                                "field": String(
866                                    "value1",
867                                ),
868                            }),
869                            Object({
870                                "__typename": String(
871                                    "User",
872                                ),
873                                "field": String(
874                                    "value2",
875                                ),
876                            }),
877                        ]),
878                    }),
879                ),
880                path: None,
881                errors: [],
882                extensions: {},
883                has_next: None,
884                subscribed: None,
885                created_at: None,
886                incremental: [],
887            },
888        }
889        "#);
890    }
891
892    #[tokio::test]
893    async fn test_handle_responses_errors() {
894        let connector = Arc::new(Connector {
895            spec: ConnectSpec::V0_1,
896            schema_subtypes_map: Default::default(),
897            id: ConnectId::new(
898                "subgraph_name".into(),
899                None,
900                name!(Query),
901                name!(user),
902                None,
903                0,
904            ),
905            transport: Some(HttpJsonTransport {
906                source_template: "http://localhost/api".parse().ok(),
907                connect_template: "/path".parse().unwrap(),
908                ..Default::default()
909            }),
910            selection: JSONSelection::parse("$.data").unwrap(),
911            entity_resolver: Some(EntityResolver::Explicit),
912            config: Default::default(),
913            max_requests: None,
914            batch_settings: None,
915            request_headers: Default::default(),
916            response_headers: Default::default(),
917            request_variable_keys: Default::default(),
918            response_variable_keys: Default::default(),
919            error_settings: Default::default(),
920            label: "test label".into(),
921        });
922
923        let response_plaintext: http::Response<RouterBody> = http::Response::builder()
924            .body(router::body::from_bytes(r#"plain text"#))
925            .unwrap();
926        let response_key_plaintext = ResponseKey::Entity {
927            index: 0,
928            inputs: Default::default(),
929            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
930        };
931
932        let response1: http::Response<RouterBody> = http::Response::builder()
933            .status(404)
934            .body(router::body::from_bytes(r#"{"error":"not found"}"#))
935            .unwrap();
936        let response_key1 = ResponseKey::Entity {
937            index: 1,
938            inputs: Default::default(),
939            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
940        };
941
942        let response2 = http::Response::builder()
943            .body(router::body::from_bytes(r#"{"data":{"id":"2"}}"#))
944            .unwrap();
945        let response_key2 = ResponseKey::Entity {
946            index: 2,
947            inputs: Default::default(),
948            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
949        };
950
951        let response3: http::Response<RouterBody> = http::Response::builder()
952            .status(500)
953            .body(router::body::from_bytes(r#"{"error":"whoops"}"#))
954            .unwrap();
955        let response_key3 = ResponseKey::Entity {
956            index: 3,
957            inputs: Default::default(),
958            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
959        };
960
961        let supergraph_request = Arc::new(
962            http::Request::builder()
963                .body(graphql::Request::builder().build())
964                .unwrap(),
965        );
966
967        let mut res = super::aggregate_responses(
968            vec![
969                process_response(
970                    Ok(response_plaintext),
971                    response_key_plaintext,
972                    connector.clone(),
973                    &Context::default(),
974                    (None, Default::default()),
975                    None,
976                    supergraph_request.clone(),
977                    Default::default(),
978                )
979                .await
980                .mapped_response,
981                process_response(
982                    Ok(response1),
983                    response_key1,
984                    connector.clone(),
985                    &Context::default(),
986                    (None, Default::default()),
987                    None,
988                    supergraph_request.clone(),
989                    Default::default(),
990                )
991                .await
992                .mapped_response,
993                process_response(
994                    Ok(response2),
995                    response_key2,
996                    connector.clone(),
997                    &Context::default(),
998                    (None, Default::default()),
999                    None,
1000                    supergraph_request.clone(),
1001                    Default::default(),
1002                )
1003                .await
1004                .mapped_response,
1005                process_response(
1006                    Ok(response3),
1007                    response_key3,
1008                    connector,
1009                    &Context::default(),
1010                    (None, Default::default()),
1011                    None,
1012                    supergraph_request,
1013                    Default::default(),
1014                )
1015                .await
1016                .mapped_response,
1017            ],
1018            Context::new(),
1019        )
1020        .unwrap();
1021
1022        // Overwrite error IDs to avoid random Uuid mismatch.
1023        // Since assert_debug_snapshot does not support redactions (which would be useful for error IDs),
1024        // we have to do it manually.
1025        let body = res.response.body_mut();
1026        body.errors = body.errors.iter_mut().map(|e| e.with_null_id()).collect();
1027
1028        assert_debug_snapshot!(res.response, @r#"
1029        Response {
1030            status: 200,
1031            version: HTTP/1.1,
1032            headers: {},
1033            body: Response {
1034                label: None,
1035                data: Some(
1036                    Object({
1037                        "_entities": Array([
1038                            Null,
1039                            Null,
1040                            Object({
1041                                "id": String(
1042                                    "2",
1043                                ),
1044                            }),
1045                            Null,
1046                        ]),
1047                    }),
1048                ),
1049                path: None,
1050                errors: [
1051                    Error {
1052                        message: "The server returned data in an unexpected format.",
1053                        locations: [],
1054                        path: Some(
1055                            Path(
1056                                [
1057                                    Key(
1058                                        "_entities",
1059                                        None,
1060                                    ),
1061                                    Index(
1062                                        0,
1063                                    ),
1064                                ],
1065                            ),
1066                        ),
1067                        extensions: {
1068                            "code": String(
1069                                "CONNECTOR_RESPONSE_INVALID",
1070                            ),
1071                            "service": String(
1072                                "subgraph_name",
1073                            ),
1074                            "connector": Object({
1075                                "coordinate": String(
1076                                    "subgraph_name:Query.user[0]",
1077                                ),
1078                            }),
1079                            "http": Object({
1080                                "status": Number(200),
1081                            }),
1082                            "apollo.private.subgraph.name": String(
1083                                "subgraph_name",
1084                            ),
1085                        },
1086                        apollo_id: 00000000-0000-0000-0000-000000000000,
1087                    },
1088                    Error {
1089                        message: "Request failed",
1090                        locations: [],
1091                        path: Some(
1092                            Path(
1093                                [
1094                                    Key(
1095                                        "_entities",
1096                                        None,
1097                                    ),
1098                                    Index(
1099                                        1,
1100                                    ),
1101                                ],
1102                            ),
1103                        ),
1104                        extensions: {
1105                            "code": String(
1106                                "CONNECTOR_FETCH",
1107                            ),
1108                            "service": String(
1109                                "subgraph_name",
1110                            ),
1111                            "connector": Object({
1112                                "coordinate": String(
1113                                    "subgraph_name:Query.user[0]",
1114                                ),
1115                            }),
1116                            "http": Object({
1117                                "status": Number(404),
1118                            }),
1119                            "apollo.private.subgraph.name": String(
1120                                "subgraph_name",
1121                            ),
1122                        },
1123                        apollo_id: 00000000-0000-0000-0000-000000000000,
1124                    },
1125                    Error {
1126                        message: "Request failed",
1127                        locations: [],
1128                        path: Some(
1129                            Path(
1130                                [
1131                                    Key(
1132                                        "_entities",
1133                                        None,
1134                                    ),
1135                                    Index(
1136                                        3,
1137                                    ),
1138                                ],
1139                            ),
1140                        ),
1141                        extensions: {
1142                            "code": String(
1143                                "CONNECTOR_FETCH",
1144                            ),
1145                            "service": String(
1146                                "subgraph_name",
1147                            ),
1148                            "connector": Object({
1149                                "coordinate": String(
1150                                    "subgraph_name:Query.user[0]",
1151                                ),
1152                            }),
1153                            "http": Object({
1154                                "status": Number(500),
1155                            }),
1156                            "apollo.private.subgraph.name": String(
1157                                "subgraph_name",
1158                            ),
1159                        },
1160                        apollo_id: 00000000-0000-0000-0000-000000000000,
1161                    },
1162                ],
1163                extensions: {},
1164                has_next: None,
1165                subscribed: None,
1166                created_at: None,
1167                incremental: [],
1168            },
1169        }
1170        "#);
1171    }
1172
1173    #[tokio::test]
1174    async fn test_handle_responses_status() {
1175        let selection = JSONSelection::parse("$status").unwrap();
1176        let connector = Arc::new(Connector {
1177            spec: ConnectSpec::V0_1,
1178            schema_subtypes_map: Default::default(),
1179            id: ConnectId::new(
1180                "subgraph_name".into(),
1181                None,
1182                name!(Query),
1183                name!(hello),
1184                None,
1185                0,
1186            ),
1187            transport: Some(HttpJsonTransport {
1188                source_template: "http://localhost/api".parse().ok(),
1189                connect_template: "/path".parse().unwrap(),
1190                ..Default::default()
1191            }),
1192            selection: selection.clone(),
1193            entity_resolver: None,
1194            config: Default::default(),
1195            max_requests: None,
1196            batch_settings: None,
1197            request_headers: Default::default(),
1198            response_headers: Default::default(),
1199            request_variable_keys: Default::default(),
1200            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1201            error_settings: Default::default(),
1202            label: "test label".into(),
1203        });
1204
1205        let response1: http::Response<RouterBody> = http::Response::builder()
1206            .status(201)
1207            .body(router::body::from_bytes(r#"{}"#))
1208            .unwrap();
1209        let response_key1 = ResponseKey::RootField {
1210            name: "hello".to_string(),
1211            inputs: Default::default(),
1212            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1213        };
1214
1215        let supergraph_request = Arc::new(
1216            http::Request::builder()
1217                .body(graphql::Request::builder().build())
1218                .unwrap(),
1219        );
1220
1221        let res = super::aggregate_responses(
1222            vec![
1223                process_response(
1224                    Ok(response1),
1225                    response_key1,
1226                    connector,
1227                    &Context::default(),
1228                    (None, Default::default()),
1229                    None,
1230                    supergraph_request,
1231                    Default::default(),
1232                )
1233                .await
1234                .mapped_response,
1235            ],
1236            Context::new(),
1237        )
1238        .unwrap();
1239
1240        assert_debug_snapshot!(res.response, @r#"
1241        Response {
1242            status: 200,
1243            version: HTTP/1.1,
1244            headers: {},
1245            body: Response {
1246                label: None,
1247                data: Some(
1248                    Object({
1249                        "hello": Number(201),
1250                    }),
1251                ),
1252                path: None,
1253                errors: [],
1254                extensions: {},
1255                has_next: None,
1256                subscribed: None,
1257                created_at: None,
1258                incremental: [],
1259            },
1260        }
1261        "#);
1262    }
1263
1264    #[tokio::test]
1265    async fn test_handle_response_with_is_success() {
1266        let is_success = JSONSelection::parse("$status ->eq(400)").unwrap();
1267        let selection = JSONSelection::parse("$status").unwrap();
1268        let error_settings: ConnectorErrorsSettings = ConnectorErrorsSettings {
1269            message: Default::default(),
1270            source_extensions: Default::default(),
1271            connect_extensions: Default::default(),
1272            connect_is_success: Some(is_success.clone()),
1273        };
1274        let connector = Arc::new(Connector {
1275            spec: ConnectSpec::V0_1,
1276            schema_subtypes_map: Default::default(),
1277            id: ConnectId::new(
1278                "subgraph_name".into(),
1279                None,
1280                name!(Query),
1281                name!(hello),
1282                None,
1283                0,
1284            ),
1285            transport: Some(HttpJsonTransport {
1286                source_template: "http://localhost/api".parse().ok(),
1287                connect_template: "/path".parse().unwrap(),
1288                ..Default::default()
1289            }),
1290            selection: selection.clone(),
1291            entity_resolver: None,
1292            config: Default::default(),
1293            max_requests: None,
1294            batch_settings: None,
1295            request_headers: Default::default(),
1296            response_headers: Default::default(),
1297            request_variable_keys: Default::default(),
1298            response_variable_keys: IndexMap::from_iter([(Namespace::Status, Default::default())]),
1299            error_settings,
1300            label: Label::from("test label"),
1301        });
1302
1303        // First request should be marked as error as status is NOT 400
1304        let response_fail: http::Response<RouterBody> = http::Response::builder()
1305            .status(201)
1306            .body(router::body::from_bytes(r#"{}"#))
1307            .unwrap();
1308        let response_fail_key = ResponseKey::RootField {
1309            name: "hello".to_string(),
1310            inputs: Default::default(),
1311            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1312        };
1313
1314        // Second response should be marked as a success as the status is 400!
1315        let response_succeed: http::Response<RouterBody> = http::Response::builder()
1316            .status(400)
1317            .body(router::body::from_bytes(r#"{}"#))
1318            .unwrap();
1319        let response_succeed_key = ResponseKey::RootField {
1320            name: "hello".to_string(),
1321            inputs: Default::default(),
1322            selection: Arc::new(JSONSelection::parse("$status").unwrap()),
1323        };
1324
1325        let supergraph_request = Arc::new(
1326            http::Request::builder()
1327                .body(graphql::Request::builder().build())
1328                .unwrap(),
1329        );
1330
1331        // Make failing request
1332        let res_expect_fail = super::aggregate_responses(
1333            vec![
1334                process_response(
1335                    Ok(response_fail),
1336                    response_fail_key,
1337                    connector.clone(),
1338                    &Context::default(),
1339                    (None, Default::default()),
1340                    None,
1341                    supergraph_request.clone(),
1342                    Default::default(),
1343                )
1344                .await
1345                .mapped_response,
1346            ],
1347            Context::new(),
1348        )
1349        .unwrap()
1350        .response;
1351        assert_eq!(res_expect_fail.body().data, Some(JsonValue::Null));
1352        assert_eq!(res_expect_fail.body().errors.len(), 1);
1353
1354        // Make succeeding request
1355        let res_expect_success = super::aggregate_responses(
1356            vec![
1357                process_response(
1358                    Ok(response_succeed),
1359                    response_succeed_key,
1360                    connector.clone(),
1361                    &Context::default(),
1362                    (None, Default::default()),
1363                    None,
1364                    supergraph_request.clone(),
1365                    Default::default(),
1366                )
1367                .await
1368                .mapped_response,
1369            ],
1370            Context::new(),
1371        )
1372        .unwrap()
1373        .response;
1374        assert!(res_expect_success.body().errors.is_empty());
1375        assert_eq!(
1376            &res_expect_success.body().data,
1377            &Some(json!({"hello": json!(400)}))
1378        );
1379    }
1380
1381    fn make_connector() -> Arc<Connector> {
1382        Arc::new(Connector {
1383            spec: ConnectSpec::V0_1,
1384            schema_subtypes_map: Default::default(),
1385            id: ConnectId::new(
1386                "subgraph_name".into(),
1387                None,
1388                name!(Query),
1389                name!(hello),
1390                None,
1391                0,
1392            ),
1393            transport: Some(HttpJsonTransport {
1394                source_template: "http://localhost/api".parse().ok(),
1395                connect_template: "/path".parse().unwrap(),
1396                ..Default::default()
1397            }),
1398            selection: JSONSelection::parse("$.data").unwrap(),
1399            entity_resolver: None,
1400            config: Default::default(),
1401            max_requests: None,
1402            batch_settings: None,
1403            request_headers: Default::default(),
1404            response_headers: Default::default(),
1405            request_variable_keys: Default::default(),
1406            response_variable_keys: Default::default(),
1407            error_settings: Default::default(),
1408            label: "test label".into(),
1409        })
1410    }
1411
1412    fn make_supergraph_request() -> Arc<http::Request<graphql::Request>> {
1413        Arc::new(
1414            http::Request::builder()
1415                .body(graphql::Request::builder().build())
1416                .unwrap(),
1417        )
1418    }
1419
1420    #[tokio::test]
1421    async fn process_response_under_size_limit() {
1422        use crate::plugins::limits::ConnectorResponseSizeLimit;
1423
1424        let ctx = Context::new();
1425        ctx.extensions()
1426            .with_lock(|e| e.insert(ConnectorResponseSizeLimit(1000)));
1427
1428        let key = ResponseKey::RootField {
1429            name: "hello".to_string(),
1430            inputs: Default::default(),
1431            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1432        };
1433        let response = http::Response::builder()
1434            .body(router::body::from_bytes(r#"{"data":"world"}"#))
1435            .unwrap();
1436
1437        let result = process_response(
1438            Ok(response),
1439            key,
1440            make_connector(),
1441            &ctx,
1442            (None, Default::default()),
1443            None,
1444            make_supergraph_request(),
1445            Default::default(),
1446        )
1447        .await;
1448
1449        let graphql_response =
1450            super::aggregate_responses(vec![result.mapped_response], Context::new())
1451                .unwrap()
1452                .response;
1453        assert!(
1454            graphql_response.body().errors.is_empty(),
1455            "expected no errors when response is under the limit"
1456        );
1457    }
1458
1459    #[tokio::test]
1460    async fn process_response_exceeds_size_limit() {
1461        use crate::plugins::limits::ConnectorResponseSizeLimit;
1462
1463        let ctx = Context::new();
1464        // Limit of 5 bytes — well under the response body size
1465        ctx.extensions()
1466            .with_lock(|e| e.insert(ConnectorResponseSizeLimit(5)));
1467
1468        let key = ResponseKey::RootField {
1469            name: "hello".to_string(),
1470            inputs: Default::default(),
1471            selection: Arc::new(JSONSelection::parse("$.data").unwrap()),
1472        };
1473        let response = http::Response::builder()
1474            .body(router::body::from_bytes(r#"{"data":"world"}"#))
1475            .unwrap();
1476
1477        let result = process_response(
1478            Ok(response),
1479            key,
1480            make_connector(),
1481            &ctx,
1482            (None, Default::default()),
1483            None,
1484            make_supergraph_request(),
1485            Default::default(),
1486        )
1487        .await;
1488
1489        let graphql_response =
1490            super::aggregate_responses(vec![result.mapped_response], Context::new())
1491                .unwrap()
1492                .response;
1493        let errors = &graphql_response.body().errors;
1494        assert!(!errors.is_empty(), "expected an error for exceeded limit");
1495        assert!(
1496            errors[0].message.contains("exceeded limit of 5 bytes"),
1497            "unexpected error message: {}",
1498            errors[0].message
1499        );
1500    }
1501}