Skip to main content

apollo_federation/connectors/runtime/
responses.rs

1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36    #[error("Merge error: {0}")]
37    MergeError(String),
38}
39
40/// Converts a response body into a json Value based on the Content-Type header.
41pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42    // If the body is obviously empty, don't try to parse it
43    if headers
44        .get(CONTENT_LENGTH)
45        .and_then(|len| len.to_str().ok())
46        .and_then(|s| s.parse::<usize>().ok())
47        .is_some_and(|content_length| content_length == 0)
48    {
49        return Ok(Value::Null);
50    }
51
52    let content_type = headers
53        .get(CONTENT_TYPE)
54        .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
55
56    if content_type.is_none()
57        || content_type
58            .as_ref()
59            .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
60    {
61        // Treat any JSON-y like content types as JSON
62        // Also, because the HTTP spec says we should effectively "guess" the content type if there is no content type (None), we're going to guess it is JSON if the server has not specified one
63        serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
64    } else if content_type
65        .as_ref()
66        .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
67    {
68        // Plain text we can't parse as JSON so we'll instead return it as a JSON string
69        // Before we can do that, we need to figure out the charset and attempt to decode the string
70        let encoding = content_type
71            .as_ref()
72            .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
73            .unwrap_or(UTF_8);
74        let (decoded_body, _, had_errors) = encoding.decode(body);
75
76        if had_errors {
77            return Err(ContentDecoding(encoding.name()));
78        }
79
80        Ok(Value::String(decoded_body.into_owned().into()))
81    } else {
82        // For any other content types, all we can do is treat it as a JSON null cause we don't know what it is
83        Ok(Value::Null)
84    }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum DeserializeError {
89    #[error("Could not parse JSON: {0}")]
90    SerdeJson(#[source] serde_json::Error),
91    #[error("Could not decode data with content encoding {0}")]
92    ContentDecoding(&'static str),
93}
94
95pub fn handle_raw_response(
96    data: &Value,
97    parts: &Parts,
98    key: ResponseKey,
99    connector: &Connector,
100    context: impl ContextReader,
101    client_headers: &HeaderMap<HeaderValue>,
102) -> MappedResponse {
103    let inputs = key
104        .inputs()
105        .clone()
106        .merger(&connector.response_variable_keys)
107        .config(connector.config.as_ref())
108        .context(context)
109        .status(parts.status.as_u16())
110        .request(&connector.response_headers, client_headers)
111        .response(&connector.response_headers, Some(parts))
112        .merge();
113    let warnings = Vec::new();
114    let (success, warnings) = is_success(connector, data, parts, &inputs, warnings);
115    if success {
116        map_response(data, key, inputs, warnings)
117    } else {
118        map_error(connector, data, parts, key, inputs, warnings)
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123struct GraphQLDataMapper<'a> {
124    doc: &'a ExecutableDocument,
125    subtypes_map: &'a IndexMap<String, IndexSet<String>>,
126}
127
128impl<'a> GraphQLDataMapper<'a> {
129    fn new(
130        doc: &'a ExecutableDocument,
131        subtypes_map: &'a IndexMap<String, IndexSet<String>>,
132    ) -> Self {
133        Self { doc, subtypes_map }
134    }
135
136    fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
137        if let Some(data_typename) = data.get("__typename") {
138            match data_typename {
139                Value::String(typename) => {
140                    self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
141                }
142                _ => false,
143            }
144        } else {
145            true
146        }
147    }
148
149    fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
150        if supertype == subtype {
151            true
152        } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
153            subtypes
154                .iter()
155                .any(|s| self.supertype_has_subtype(s, subtype))
156        } else {
157            false
158        }
159    }
160
161    fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
162        if selection_set.selections.is_empty() {
163            return data.clone();
164        }
165
166        match data {
167            Value::Object(map) => {
168                let mut new_map = Map::new();
169
170                for field in selection_set.selections.iter() {
171                    match field {
172                        Selection::Field(field) => {
173                            if let Some(field_value) = map.get(field.name.as_str()) {
174                                let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
175                                new_map.insert(
176                                    output_field_name.to_string(),
177                                    self.map_data(field_value, &field.selection_set),
178                                );
179                            }
180                        }
181
182                        Selection::FragmentSpread(spread) => {
183                            if let Some(fragment) =
184                                self.doc.fragments.get(spread.fragment_name.as_str())
185                                && self.fragment_matches(data, fragment.type_condition())
186                            {
187                                let mapped = self.map_data(data, &fragment.selection_set);
188                                if let Some(fragment_map) = mapped.as_object() {
189                                    new_map.extend(fragment_map.clone());
190                                }
191                            }
192                        }
193
194                        Selection::InlineFragment(fragment) => {
195                            if let Some(type_condition) = &fragment.type_condition
196                                && !self.fragment_matches(data, type_condition)
197                            {
198                                continue;
199                            }
200                            let mapped = self.map_data(data, &fragment.selection_set);
201                            if let Some(fragment_map) = mapped.as_object() {
202                                new_map.extend(fragment_map.clone());
203                            }
204                        }
205                    }
206                }
207
208                Value::Object(new_map)
209            }
210
211            Value::Array(items) => Value::Array(
212                items
213                    .iter()
214                    .map(|item| self.map_data(item, selection_set))
215                    .collect(),
216            ),
217
218            primitive => primitive.clone(),
219        }
220    }
221}
222
223// If the user has set a custom success condition selector, resolve that expression,
224// otherwise default to checking status code is 2XX
225fn is_success(
226    connector: &Connector,
227    data: &Value,
228    parts: &Parts,
229    inputs: &IndexMap<String, Value>,
230    mut warnings: Vec<Problem>,
231) -> (bool, Vec<Problem>) {
232    let Some(is_success_selection) = &connector.error_settings.connect_is_success else {
233        return (parts.status.is_success(), warnings);
234    };
235    let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
236    warnings.extend(aggregate_apply_to_errors(
237        apply_to_errors,
238        ProblemLocation::IsSuccess,
239    ));
240
241    (
242        res.as_ref().and_then(Value::as_bool).unwrap_or_default(),
243        warnings,
244    )
245}
246
247/// Returns a response with data transformed by the selection mapping.
248pub(super) fn map_response(
249    data: &Value,
250    key: ResponseKey,
251    inputs: IndexMap<String, Value>,
252    mut warnings: Vec<Problem>,
253) -> MappedResponse {
254    let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
255    warnings.extend(aggregate_apply_to_errors(
256        apply_to_errors,
257        ProblemLocation::Selection,
258    ));
259    MappedResponse::Data {
260        key,
261        data: res.unwrap_or_else(|| Value::Null),
262        problems: warnings,
263    }
264}
265
266/// Returns a `MappedResponse` with a GraphQL error.
267pub(super) fn map_error(
268    connector: &Connector,
269    data: &Value,
270    parts: &Parts,
271    key: ResponseKey,
272    inputs: IndexMap<String, Value>,
273    mut warnings: Vec<Problem>,
274) -> MappedResponse {
275    // Do we have an error message mapping set for this connector?
276    let message = if let Some(message_selection) = &connector.error_settings.message {
277        let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
278        warnings.extend(aggregate_apply_to_errors(
279            apply_to_errors,
280            ProblemLocation::ErrorsMessage,
281        ));
282        res.as_ref()
283            .and_then(Value::as_str)
284            .unwrap_or_default()
285            .to_string()
286    } else {
287        "Request failed".to_string()
288    };
289
290    // Now we can create the error object using either the default message or the message calculated by the JSONSelection
291    let mut error = RuntimeError::new(message, &key);
292    error.subgraph_name = Some(connector.id.subgraph_name.clone());
293    error.coordinate = Some(connector.id.coordinate());
294
295    // First, we will apply defaults... these may get overwritten below by user configured extensions
296    error = error.extension(
297        "http",
298        Value::Object(Map::from_iter([(
299            "status".into(),
300            Value::Number(parts.status.as_u16().into()),
301        )])),
302    );
303
304    // If we have error extensions mapping set for this connector, we will need to grab the code + the remaining extensions and map them to the error object
305    // We'll merge by applying the source and then the connect. Keep in mind that these will override defaults if the key names are the same.
306    // Note: that we set the extension code in this if/else but don't actually set it on the error until after the if/else. This is because the compiler
307    // can't make sense of it in the if/else due to how the builder is constructed.
308    let mut extension_code = "CONNECTOR_FETCH".to_string();
309    if let Some(extensions_selection) = &connector.error_settings.source_extensions {
310        let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
311        warnings.extend(aggregate_apply_to_errors(
312            apply_to_errors,
313            ProblemLocation::SourceErrorsExtensions,
314        ));
315
316        // TODO: Currently this "fails silently". In the future, we probably add a warning to the debugger info.
317        let extensions = res
318            .and_then(|e| match e {
319                Value::Object(map) => Some(map),
320                _ => None,
321            })
322            .unwrap_or_default();
323
324        if let Some(code) = extensions.get("code") {
325            extension_code = code.as_str().unwrap_or_default().to_string();
326        }
327
328        for (key, value) in extensions {
329            error = error.extension(key, value);
330        }
331    }
332
333    if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
334        let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
335        warnings.extend(aggregate_apply_to_errors(
336            apply_to_errors,
337            ProblemLocation::ConnectErrorsExtensions,
338        ));
339
340        // TODO: Currently this "fails silently". In the future, we probably add a warning to the debugger info.
341        let extensions = res
342            .and_then(|e| match e {
343                Value::Object(map) => Some(map),
344                _ => None,
345            })
346            .unwrap_or_default();
347
348        if let Some(code) = extensions.get("code") {
349            extension_code = code.as_str().unwrap_or_default().to_string();
350        }
351
352        for (key, value) in extensions {
353            error = error.extension(key, value);
354        }
355    }
356
357    error = error.with_code(extension_code);
358
359    MappedResponse::Error {
360        error,
361        key,
362        problems: warnings,
363    }
364}
365// --- MAPPED RESPONSE ---------------------------------------------------------
366#[derive(Debug)]
367pub enum MappedResponse {
368    /// This is equivalent to RawResponse::Error, but it also represents errors
369    /// when the request is semantically unsuccessful (e.g. 404, 500).
370    Error {
371        error: RuntimeError,
372        key: ResponseKey,
373        problems: Vec<Problem>,
374    },
375    /// The response data after applying the selection mapping.
376    Data {
377        data: Value,
378        key: ResponseKey,
379        problems: Vec<Problem>,
380    },
381}
382
383impl MappedResponse {
384    /// Adds the response data to the `data` map or the error to the `errors`
385    /// array. How data is added depends on the `ResponseKey`: it's either a
386    /// property directly on the map, or stored in the `_entities` array.
387    pub fn add_to_data(
388        self,
389        data: &mut Map<ByteString, Value>,
390        errors: &mut Vec<RuntimeError>,
391        count: usize,
392    ) -> Result<(), HandleResponseError> {
393        match self {
394            Self::Error { error, key, .. } => {
395                match key {
396                    // add a null to the "_entities" array at the right index
397                    ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
398                        let entities = data
399                            .entry(ENTITIES)
400                            .or_insert(Value::Array(Vec::with_capacity(count)));
401                        entities
402                            .as_array_mut()
403                            .ok_or_else(|| {
404                                HandleResponseError::MergeError("_entities is not an array".into())
405                            })?
406                            .insert(index, Value::Null);
407                    }
408                    _ => {}
409                };
410                errors.push(error);
411            }
412            Self::Data {
413                data: value, key, ..
414            } => match key {
415                ResponseKey::RootField { ref name, .. } => {
416                    data.insert(name.clone(), value);
417                }
418                ResponseKey::Entity { index, .. } => {
419                    let entities = data
420                        .entry(ENTITIES)
421                        .or_insert(Value::Array(Vec::with_capacity(count)));
422                    entities
423                        .as_array_mut()
424                        .ok_or_else(|| {
425                            HandleResponseError::MergeError("_entities is not an array".into())
426                        })?
427                        .insert(index, value);
428                }
429                ResponseKey::EntityField {
430                    index,
431                    ref field_name,
432                    ref typename,
433                    ..
434                } => {
435                    let entities = data
436                        .entry(ENTITIES)
437                        .or_insert(Value::Array(Vec::with_capacity(count)))
438                        .as_array_mut()
439                        .ok_or_else(|| {
440                            HandleResponseError::MergeError("_entities is not an array".into())
441                        })?;
442
443                    match entities.get_mut(index) {
444                        Some(Value::Object(entity)) => {
445                            entity.insert(field_name.clone(), value);
446                        }
447                        _ => {
448                            let mut entity = Map::new();
449                            if let Some(typename) = typename {
450                                entity.insert(TYPENAME, Value::String(typename.as_str().into()));
451                            }
452                            entity.insert(field_name.clone(), value);
453                            entities.insert(index, Value::Object(entity));
454                        }
455                    };
456                }
457                ResponseKey::BatchEntity {
458                    selection,
459                    keys,
460                    inputs,
461                } => {
462                    let Value::Array(values) = value else {
463                        return Err(HandleResponseError::MergeError(
464                            "Response for a batch request does not map to an array".into(),
465                        ));
466                    };
467
468                    let spec = selection.spec();
469                    let key_selection = JSONSelection::parse_with_spec(
470                        &keys.serialize().no_indent().to_string(),
471                        spec,
472                    )
473                    .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
474
475                    // Convert representations into keys for use in the map
476                    let key_values = inputs.batch.iter().map(|v| {
477                        key_selection
478                            .apply_to(&Value::Object(v.clone()))
479                            .0
480                            .unwrap_or(Value::Null)
481                    });
482
483                    // Create a map of keys to entities
484                    let mut map = values
485                        .into_iter()
486                        .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
487                        .collect::<HashMap<_, _>>();
488
489                    // Make a list of entities that matches the representations list
490                    let new_entities = key_values
491                        .map(|key| map.remove(&key).unwrap_or(Value::Null))
492                        .collect_vec();
493
494                    // Because we may have multiple batch entities requests, we should add to ENTITIES as the requests come in so it is additive
495                    let entities = data
496                        .entry(ENTITIES)
497                        .or_insert(Value::Array(Vec::with_capacity(count)));
498
499                    entities
500                        .as_array_mut()
501                        .ok_or_else(|| {
502                            HandleResponseError::MergeError("_entities is not an array".into())
503                        })?
504                        .extend(new_entities);
505                }
506            },
507        }
508
509        Ok(())
510    }
511
512    pub fn problems(&self) -> &[Problem] {
513        match self {
514            Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
515        }
516    }
517
518    /// Applies the given GraphQL operation (note: must be a single operation!)
519    /// to the [`MappedResponse`] to produce a new [`MappedResponse`] with
520    /// GraphQL transforms like alias renaming applied.
521    ///
522    /// The `operation_option` parameter is an [`Option<&ExecutableDocument>`]
523    /// to simplify cases where you might not have an [`ExecutableDocument`]
524    /// available (hence `None`). When `operation_option.is_none()`, note that
525    /// `subtypes` is ignored.
526    ///
527    /// The `subtypes` parameter is necessary for handling abstract fragment
528    /// type conditions, since that information is not preserved in
529    /// [`ExecutableDocument`].
530    pub fn apply_operation(
531        self, // NOTE: Takes ownership of self!
532        operation_option: Option<&ExecutableDocument>,
533        subtypes: &IndexMap<String, IndexSet<String>>,
534    ) -> Self {
535        match (self, operation_option) {
536            (
537                Self::Data {
538                    data,
539                    key,
540                    problems,
541                },
542                Some(operation),
543            ) => {
544                let single_op = operation
545                    .operations
546                    .anonymous
547                    .as_ref()
548                    .or_else(|| operation.operations.named.values().next());
549
550                let data = if let Some(op) = single_op {
551                    let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
552
553                    match &key {
554                        ResponseKey::RootField { name, .. } => {
555                            for field in op.selection_set.selections.iter() {
556                                if let Selection::Field(field) = field
557                                    && field.name.as_str() == name.as_str()
558                                {
559                                    new_sub
560                                        .selections
561                                        .extend(field.selection_set.selections.iter().cloned());
562                                }
563                            }
564                        }
565
566                        ResponseKey::EntityField { field_name, .. } => {
567                            let field_str = field_name.as_str();
568
569                            for selection in op.selection_set.selections.iter() {
570                                if let Selection::Field(field) = selection
571                                    && field.name.as_str() == "_entities"
572                                {
573                                    for ent_sel in field.selection_set.selections.iter() {
574                                        // Selection::InlineFragment is what we
575                                        // actually expect, but we could handle
576                                        // ::Field and ::FragmentSpread too if
577                                        // necessary.
578                                        match ent_sel {
579                                            Selection::InlineFragment(frag) => {
580                                                for field_sel in
581                                                    frag.selection_set.selections.iter()
582                                                {
583                                                    if let Selection::Field(field) = field_sel
584                                                        && field.name.as_str() == field_str
585                                                    {
586                                                        new_sub.selections.extend(
587                                                            field
588                                                                .selection_set
589                                                                .selections
590                                                                .iter()
591                                                                .cloned(),
592                                                        );
593                                                    }
594                                                }
595                                            }
596
597                                            Selection::Field(field) => {
598                                                if field.name.as_str() == field_str {
599                                                    new_sub.selections.extend(
600                                                        field
601                                                            .selection_set
602                                                            .selections
603                                                            .iter()
604                                                            .cloned(),
605                                                    );
606                                                }
607                                            }
608
609                                            Selection::FragmentSpread(spread) => {
610                                                if let Some(fragment) = operation
611                                                    .fragments
612                                                    .get(spread.fragment_name.as_str())
613                                                {
614                                                    for field_sel in
615                                                        fragment.selection_set.selections.iter()
616                                                    {
617                                                        if let Selection::Field(field) = field_sel
618                                                            && field.name.as_str() == field_str
619                                                        {
620                                                            new_sub.selections.extend(
621                                                                field
622                                                                    .selection_set
623                                                                    .selections
624                                                                    .iter()
625                                                                    .cloned(),
626                                                            );
627                                                        }
628                                                    }
629                                                }
630                                            }
631                                        }
632                                    }
633                                }
634                            }
635                        }
636
637                        ResponseKey::Entity { .. } => {
638                            for selection in op.selection_set.selections.iter() {
639                                if let Selection::Field(field) = selection
640                                    && field.name.as_str() == "_entities"
641                                {
642                                    new_sub
643                                        .selections
644                                        .extend(field.selection_set.selections.iter().cloned());
645                                }
646                            }
647                        }
648
649                        ResponseKey::BatchEntity { keys, .. } => {
650                            new_sub
651                                .selections
652                                .extend(keys.selection_set.selections.iter().cloned());
653
654                            for selection in op.selection_set.selections.iter() {
655                                if let Selection::Field(field) = selection
656                                    && field.name.as_str() == "_entities"
657                                {
658                                    new_sub
659                                        .selections
660                                        .extend(field.selection_set.selections.iter().cloned());
661                                }
662                            }
663                        }
664                    };
665
666                    GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
667                } else {
668                    data
669                };
670
671                Self::Data {
672                    data,
673                    key,
674                    problems,
675                }
676            }
677
678            // We do not transform errors using the operation.
679            (
680                MappedResponse::Error {
681                    error,
682                    key,
683                    problems,
684                },
685                Some(_),
686            ) => MappedResponse::Error {
687                error,
688                key,
689                problems,
690            },
691
692            // When operation_option.is_none(), return self unmodified.
693            (mapped, None) => mapped,
694        }
695    }
696}