Skip to main content

apollo_federation/connectors/runtime/
responses.rs

1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36    #[error("Merge error: {0}")]
37    MergeError(String),
38}
39
40/// Converts a response body into a json Value based on the Content-Type header.
41pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42    // If the body is empty, there's nothing to parse. We check body.is_empty()
43    // directly because spec-compliant HTTP 204 responses must not include a
44    // Content-Length header — so we can't rely on that header alone to detect
45    // empty bodies. The Content-Length: 0 check is kept for non-compliant
46    // servers that do send it, but body.is_empty() covers both cases.
47    if body.is_empty()
48        || headers
49            .get(CONTENT_LENGTH)
50            .and_then(|len| len.to_str().ok())
51            .and_then(|s| s.parse::<usize>().ok())
52            .is_some_and(|content_length| content_length == 0)
53    {
54        return Ok(Value::Null);
55    }
56
57    let content_type = headers
58        .get(CONTENT_TYPE)
59        .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
60
61    if content_type.is_none()
62        || content_type
63            .as_ref()
64            .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
65    {
66        // Treat any JSON-y like content types as JSON
67        // Also, because the HTTP spec says we should effectively "guess" the content type if there is no content type (None), we're going to guess it is JSON if the server has not specified one
68        serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
69    } else if content_type
70        .as_ref()
71        .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
72    {
73        // Plain text we can't parse as JSON so we'll instead return it as a JSON string
74        // Before we can do that, we need to figure out the charset and attempt to decode the string
75        let encoding = content_type
76            .as_ref()
77            .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
78            .unwrap_or(UTF_8);
79        let (decoded_body, _, had_errors) = encoding.decode(body);
80
81        if had_errors {
82            return Err(ContentDecoding(encoding.name()));
83        }
84
85        Ok(Value::String(decoded_body.into_owned().into()))
86    } else {
87        // For any other content types, all we can do is treat it as a JSON null cause we don't know what it is
88        Ok(Value::Null)
89    }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum DeserializeError {
94    #[error("Could not parse JSON: {0}")]
95    SerdeJson(#[source] serde_json::Error),
96    #[error("Could not decode data with content encoding {0}")]
97    ContentDecoding(&'static str),
98}
99
100pub fn handle_raw_response(
101    data: &Value,
102    parts: &Parts,
103    key: ResponseKey,
104    connector: &Connector,
105    context: impl ContextReader,
106    client_headers: &HeaderMap<HeaderValue>,
107) -> MappedResponse {
108    let inputs = key
109        .inputs()
110        .clone()
111        .merger(&connector.response_variable_keys)
112        .config(connector.config.as_ref())
113        .context(context)
114        .status(parts.status.as_u16())
115        .request(&connector.response_headers, client_headers)
116        .response(&connector.response_headers, Some(parts))
117        .merge();
118    let warnings = Vec::new();
119    let (success, warnings) = is_success(connector, data, parts, &inputs, warnings);
120    if success {
121        map_response(data, key, inputs, warnings)
122    } else {
123        map_error(connector, data, parts, key, inputs, warnings)
124    }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128struct GraphQLDataMapper<'a> {
129    doc: &'a ExecutableDocument,
130    subtypes_map: &'a IndexMap<String, IndexSet<String>>,
131}
132
133impl<'a> GraphQLDataMapper<'a> {
134    fn new(
135        doc: &'a ExecutableDocument,
136        subtypes_map: &'a IndexMap<String, IndexSet<String>>,
137    ) -> Self {
138        Self { doc, subtypes_map }
139    }
140
141    fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
142        if let Some(data_typename) = data.get("__typename") {
143            match data_typename {
144                Value::String(typename) => {
145                    self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
146                }
147                _ => false,
148            }
149        } else {
150            true
151        }
152    }
153
154    fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
155        if supertype == subtype {
156            true
157        } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
158            subtypes
159                .iter()
160                .any(|s| self.supertype_has_subtype(s, subtype))
161        } else {
162            false
163        }
164    }
165
166    fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
167        if selection_set.selections.is_empty() {
168            return data.clone();
169        }
170
171        match data {
172            Value::Object(map) => {
173                let mut new_map = Map::new();
174
175                for field in selection_set.selections.iter() {
176                    match field {
177                        Selection::Field(field) => {
178                            if let Some(field_value) = map.get(field.name.as_str()) {
179                                let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
180                                new_map.insert(
181                                    output_field_name.to_string(),
182                                    self.map_data(field_value, &field.selection_set),
183                                );
184                            }
185                        }
186
187                        Selection::FragmentSpread(spread) => {
188                            if let Some(fragment) =
189                                self.doc.fragments.get(spread.fragment_name.as_str())
190                                && self.fragment_matches(data, fragment.type_condition())
191                            {
192                                let mapped = self.map_data(data, &fragment.selection_set);
193                                if let Some(fragment_map) = mapped.as_object() {
194                                    new_map.extend(fragment_map.clone());
195                                }
196                            }
197                        }
198
199                        Selection::InlineFragment(fragment) => {
200                            if let Some(type_condition) = &fragment.type_condition
201                                && !self.fragment_matches(data, type_condition)
202                            {
203                                continue;
204                            }
205                            let mapped = self.map_data(data, &fragment.selection_set);
206                            if let Some(fragment_map) = mapped.as_object() {
207                                new_map.extend(fragment_map.clone());
208                            }
209                        }
210                    }
211                }
212
213                Value::Object(new_map)
214            }
215
216            Value::Array(items) => Value::Array(
217                items
218                    .iter()
219                    .map(|item| self.map_data(item, selection_set))
220                    .collect(),
221            ),
222
223            primitive => primitive.clone(),
224        }
225    }
226}
227
228// If the user has set a custom success condition selector, resolve that expression,
229// otherwise default to checking status code is 2XX
230fn is_success(
231    connector: &Connector,
232    data: &Value,
233    parts: &Parts,
234    inputs: &IndexMap<String, Value>,
235    mut warnings: Vec<Problem>,
236) -> (bool, Vec<Problem>) {
237    let Some(is_success_selection) = &connector.error_settings.connect_is_success else {
238        return (parts.status.is_success(), warnings);
239    };
240    let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
241    warnings.extend(aggregate_apply_to_errors(
242        apply_to_errors,
243        ProblemLocation::IsSuccess,
244    ));
245
246    (
247        res.as_ref().and_then(Value::as_bool).unwrap_or_default(),
248        warnings,
249    )
250}
251
252/// Returns a response with data transformed by the selection mapping.
253pub(super) fn map_response(
254    data: &Value,
255    key: ResponseKey,
256    inputs: IndexMap<String, Value>,
257    mut warnings: Vec<Problem>,
258) -> MappedResponse {
259    let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
260    warnings.extend(aggregate_apply_to_errors(
261        apply_to_errors,
262        ProblemLocation::Selection,
263    ));
264    MappedResponse::Data {
265        key,
266        data: res.unwrap_or_else(|| Value::Null),
267        problems: warnings,
268    }
269}
270
271/// Returns a `MappedResponse` with a GraphQL error.
272pub(super) fn map_error(
273    connector: &Connector,
274    data: &Value,
275    parts: &Parts,
276    key: ResponseKey,
277    inputs: IndexMap<String, Value>,
278    mut warnings: Vec<Problem>,
279) -> MappedResponse {
280    // Do we have an error message mapping set for this connector?
281    let message = if let Some(message_selection) = &connector.error_settings.message {
282        let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
283        warnings.extend(aggregate_apply_to_errors(
284            apply_to_errors,
285            ProblemLocation::ErrorsMessage,
286        ));
287        res.as_ref()
288            .and_then(Value::as_str)
289            .unwrap_or_default()
290            .to_string()
291    } else {
292        "Request failed".to_string()
293    };
294
295    // Now we can create the error object using either the default message or the message calculated by the JSONSelection
296    let mut error = RuntimeError::new(message, &key);
297    error.subgraph_name = Some(connector.id.subgraph_name.clone());
298    error.coordinate = Some(connector.id.coordinate());
299
300    // First, we will apply defaults... these may get overwritten below by user configured extensions
301    error = error.extension(
302        "http",
303        Value::Object(Map::from_iter([(
304            "status".into(),
305            Value::Number(parts.status.as_u16().into()),
306        )])),
307    );
308
309    // If we have error extensions mapping set for this connector, we will need to grab the code + the remaining extensions and map them to the error object
310    // We'll merge by applying the source and then the connect. Keep in mind that these will override defaults if the key names are the same.
311    // Note: that we set the extension code in this if/else but don't actually set it on the error until after the if/else. This is because the compiler
312    // can't make sense of it in the if/else due to how the builder is constructed.
313    let mut extension_code = "CONNECTOR_FETCH".to_string();
314    if let Some(extensions_selection) = &connector.error_settings.source_extensions {
315        let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
316        warnings.extend(aggregate_apply_to_errors(
317            apply_to_errors,
318            ProblemLocation::SourceErrorsExtensions,
319        ));
320
321        // TODO: Currently this "fails silently". In the future, we probably add a warning to the debugger info.
322        let extensions = res
323            .and_then(|e| match e {
324                Value::Object(map) => Some(map),
325                _ => None,
326            })
327            .unwrap_or_default();
328
329        if let Some(code) = extensions.get("code") {
330            extension_code = code.as_str().unwrap_or_default().to_string();
331        }
332
333        for (key, value) in extensions {
334            error = error.extension(key, value);
335        }
336    }
337
338    if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
339        let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
340        warnings.extend(aggregate_apply_to_errors(
341            apply_to_errors,
342            ProblemLocation::ConnectErrorsExtensions,
343        ));
344
345        // TODO: Currently this "fails silently". In the future, we probably add a warning to the debugger info.
346        let extensions = res
347            .and_then(|e| match e {
348                Value::Object(map) => Some(map),
349                _ => None,
350            })
351            .unwrap_or_default();
352
353        if let Some(code) = extensions.get("code") {
354            extension_code = code.as_str().unwrap_or_default().to_string();
355        }
356
357        for (key, value) in extensions {
358            error = error.extension(key, value);
359        }
360    }
361
362    error = error.with_code(extension_code);
363
364    MappedResponse::Error {
365        error,
366        key,
367        problems: warnings,
368    }
369}
370// --- MAPPED RESPONSE ---------------------------------------------------------
371#[derive(Debug)]
372pub enum MappedResponse {
373    /// This is equivalent to RawResponse::Error, but it also represents errors
374    /// when the request is semantically unsuccessful (e.g. 404, 500).
375    Error {
376        error: RuntimeError,
377        key: ResponseKey,
378        problems: Vec<Problem>,
379    },
380    /// The response data after applying the selection mapping.
381    Data {
382        data: Value,
383        key: ResponseKey,
384        problems: Vec<Problem>,
385    },
386}
387
388impl MappedResponse {
389    /// Adds the response data to the `data` map or the error to the `errors`
390    /// array. How data is added depends on the `ResponseKey`: it's either a
391    /// property directly on the map, or stored in the `_entities` array.
392    pub fn add_to_data(
393        self,
394        data: &mut Map<ByteString, Value>,
395        errors: &mut Vec<RuntimeError>,
396        count: usize,
397    ) -> Result<(), HandleResponseError> {
398        match self {
399            Self::Error { error, key, .. } => {
400                match key {
401                    // add a null to the "_entities" array at the right index
402                    ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
403                        let entities = data
404                            .entry(ENTITIES)
405                            .or_insert(Value::Array(Vec::with_capacity(count)));
406                        entities
407                            .as_array_mut()
408                            .ok_or_else(|| {
409                                HandleResponseError::MergeError("_entities is not an array".into())
410                            })?
411                            .insert(index, Value::Null);
412                    }
413                    _ => {}
414                };
415                errors.push(error);
416            }
417            Self::Data {
418                data: value, key, ..
419            } => match key {
420                ResponseKey::RootField { ref name, .. } => {
421                    data.insert(name.clone(), value);
422                }
423                ResponseKey::Entity { index, .. } => {
424                    let entities = data
425                        .entry(ENTITIES)
426                        .or_insert(Value::Array(Vec::with_capacity(count)));
427                    entities
428                        .as_array_mut()
429                        .ok_or_else(|| {
430                            HandleResponseError::MergeError("_entities is not an array".into())
431                        })?
432                        .insert(index, value);
433                }
434                ResponseKey::EntityField {
435                    index,
436                    ref field_name,
437                    ref typename,
438                    ..
439                } => {
440                    let entities = data
441                        .entry(ENTITIES)
442                        .or_insert(Value::Array(Vec::with_capacity(count)))
443                        .as_array_mut()
444                        .ok_or_else(|| {
445                            HandleResponseError::MergeError("_entities is not an array".into())
446                        })?;
447
448                    match entities.get_mut(index) {
449                        Some(Value::Object(entity)) => {
450                            entity.insert(field_name.clone(), value);
451                        }
452                        _ => {
453                            let mut entity = Map::new();
454                            if let Some(typename) = typename {
455                                entity.insert(TYPENAME, Value::String(typename.as_str().into()));
456                            }
457                            entity.insert(field_name.clone(), value);
458                            entities.insert(index, Value::Object(entity));
459                        }
460                    };
461                }
462                ResponseKey::BatchEntity {
463                    selection,
464                    keys,
465                    inputs,
466                } => {
467                    let Value::Array(values) = value else {
468                        return Err(HandleResponseError::MergeError(
469                            "Response for a batch request does not map to an array".into(),
470                        ));
471                    };
472
473                    let spec = selection.spec();
474                    let key_selection = JSONSelection::parse_with_spec(
475                        &keys.serialize().no_indent().to_string(),
476                        spec,
477                    )
478                    .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
479
480                    // Convert representations into keys for use in the map
481                    let key_values = inputs.batch.iter().map(|v| {
482                        key_selection
483                            .apply_to(&Value::Object(v.clone()))
484                            .0
485                            .unwrap_or(Value::Null)
486                    });
487
488                    // Create a map of keys to entities
489                    let mut map = values
490                        .into_iter()
491                        .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
492                        .collect::<HashMap<_, _>>();
493
494                    // Make a list of entities that matches the representations list
495                    let new_entities = key_values
496                        .map(|key| map.remove(&key).unwrap_or(Value::Null))
497                        .collect_vec();
498
499                    // Because we may have multiple batch entities requests, we should add to ENTITIES as the requests come in so it is additive
500                    let entities = data
501                        .entry(ENTITIES)
502                        .or_insert(Value::Array(Vec::with_capacity(count)));
503
504                    entities
505                        .as_array_mut()
506                        .ok_or_else(|| {
507                            HandleResponseError::MergeError("_entities is not an array".into())
508                        })?
509                        .extend(new_entities);
510                }
511            },
512        }
513
514        Ok(())
515    }
516
517    pub fn problems(&self) -> &[Problem] {
518        match self {
519            Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
520        }
521    }
522
523    /// Applies the given GraphQL operation (note: must be a single operation!)
524    /// to the [`MappedResponse`] to produce a new [`MappedResponse`] with
525    /// GraphQL transforms like alias renaming applied.
526    ///
527    /// The `operation_option` parameter is an [`Option<&ExecutableDocument>`]
528    /// to simplify cases where you might not have an [`ExecutableDocument`]
529    /// available (hence `None`). When `operation_option.is_none()`, note that
530    /// `subtypes` is ignored.
531    ///
532    /// The `subtypes` parameter is necessary for handling abstract fragment
533    /// type conditions, since that information is not preserved in
534    /// [`ExecutableDocument`].
535    pub fn apply_operation(
536        self, // NOTE: Takes ownership of self!
537        operation_option: Option<&ExecutableDocument>,
538        subtypes: &IndexMap<String, IndexSet<String>>,
539    ) -> Self {
540        match (self, operation_option) {
541            (
542                Self::Data {
543                    data,
544                    key,
545                    problems,
546                },
547                Some(operation),
548            ) => {
549                let single_op = operation
550                    .operations
551                    .anonymous
552                    .as_ref()
553                    .or_else(|| operation.operations.named.values().next());
554
555                let data = if let Some(op) = single_op {
556                    let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
557
558                    match &key {
559                        ResponseKey::RootField { name, .. } => {
560                            for field in op.selection_set.selections.iter() {
561                                if let Selection::Field(field) = field
562                                    && field.name.as_str() == name.as_str()
563                                {
564                                    new_sub
565                                        .selections
566                                        .extend(field.selection_set.selections.iter().cloned());
567                                }
568                            }
569                        }
570
571                        ResponseKey::EntityField { field_name, .. } => {
572                            let field_str = field_name.as_str();
573
574                            for selection in op.selection_set.selections.iter() {
575                                if let Selection::Field(field) = selection
576                                    && field.name.as_str() == "_entities"
577                                {
578                                    for ent_sel in field.selection_set.selections.iter() {
579                                        // Selection::InlineFragment is what we
580                                        // actually expect, but we could handle
581                                        // ::Field and ::FragmentSpread too if
582                                        // necessary.
583                                        match ent_sel {
584                                            Selection::InlineFragment(frag) => {
585                                                for field_sel in
586                                                    frag.selection_set.selections.iter()
587                                                {
588                                                    if let Selection::Field(field) = field_sel
589                                                        && field.name.as_str() == field_str
590                                                    {
591                                                        new_sub.selections.extend(
592                                                            field
593                                                                .selection_set
594                                                                .selections
595                                                                .iter()
596                                                                .cloned(),
597                                                        );
598                                                    }
599                                                }
600                                            }
601
602                                            Selection::Field(field) => {
603                                                if field.name.as_str() == field_str {
604                                                    new_sub.selections.extend(
605                                                        field
606                                                            .selection_set
607                                                            .selections
608                                                            .iter()
609                                                            .cloned(),
610                                                    );
611                                                }
612                                            }
613
614                                            Selection::FragmentSpread(spread) => {
615                                                if let Some(fragment) = operation
616                                                    .fragments
617                                                    .get(spread.fragment_name.as_str())
618                                                {
619                                                    for field_sel in
620                                                        fragment.selection_set.selections.iter()
621                                                    {
622                                                        if let Selection::Field(field) = field_sel
623                                                            && field.name.as_str() == field_str
624                                                        {
625                                                            new_sub.selections.extend(
626                                                                field
627                                                                    .selection_set
628                                                                    .selections
629                                                                    .iter()
630                                                                    .cloned(),
631                                                            );
632                                                        }
633                                                    }
634                                                }
635                                            }
636                                        }
637                                    }
638                                }
639                            }
640                        }
641
642                        ResponseKey::Entity { .. } => {
643                            for selection in op.selection_set.selections.iter() {
644                                if let Selection::Field(field) = selection
645                                    && field.name.as_str() == "_entities"
646                                {
647                                    new_sub
648                                        .selections
649                                        .extend(field.selection_set.selections.iter().cloned());
650                                }
651                            }
652                        }
653
654                        ResponseKey::BatchEntity { keys, .. } => {
655                            new_sub
656                                .selections
657                                .extend(keys.selection_set.selections.iter().cloned());
658
659                            for selection in op.selection_set.selections.iter() {
660                                if let Selection::Field(field) = selection
661                                    && field.name.as_str() == "_entities"
662                                {
663                                    new_sub
664                                        .selections
665                                        .extend(field.selection_set.selections.iter().cloned());
666                                }
667                            }
668                        }
669                    };
670
671                    GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
672                } else {
673                    data
674                };
675
676                Self::Data {
677                    data,
678                    key,
679                    problems,
680                }
681            }
682
683            // We do not transform errors using the operation.
684            (
685                MappedResponse::Error {
686                    error,
687                    key,
688                    problems,
689                },
690                Some(_),
691            ) => MappedResponse::Error {
692                error,
693                key,
694                problems,
695            },
696
697            // When operation_option.is_none(), return self unmodified.
698            (mapped, None) => mapped,
699        }
700    }
701}
702
703#[cfg(test)]
704mod tests {
705    use http::HeaderMap;
706    use http::HeaderValue;
707    use serde_json_bytes::Value;
708
709    use super::deserialize_response;
710
711    fn headers_with(pairs: &[(&str, &str)]) -> HeaderMap {
712        let mut map = HeaderMap::new();
713        for (k, v) in pairs {
714            map.insert(
715                http::header::HeaderName::from_bytes(k.as_bytes()).unwrap(),
716                HeaderValue::from_str(v).unwrap(),
717            );
718        }
719        map
720    }
721
722    #[test]
723    fn empty_body_no_content_length_returns_null() {
724        // Spec-compliant 204: no Content-Length header, no body.
725        let headers = HeaderMap::new();
726        let result = deserialize_response(b"", &headers).unwrap();
727        assert_eq!(result, Value::Null);
728    }
729
730    #[test]
731    fn empty_body_with_content_length_zero_returns_null() {
732        // Non-compliant server that sends Content-Length: 0 on a 204.
733        let headers = headers_with(&[("content-length", "0")]);
734        let result = deserialize_response(b"", &headers).unwrap();
735        assert_eq!(result, Value::Null);
736    }
737}