1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36 #[error("Merge error: {0}")]
37 MergeError(String),
38}
39
40pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42 if body.is_empty()
48 || headers
49 .get(CONTENT_LENGTH)
50 .and_then(|len| len.to_str().ok())
51 .and_then(|s| s.parse::<usize>().ok())
52 .is_some_and(|content_length| content_length == 0)
53 {
54 return Ok(Value::Null);
55 }
56
57 let content_type = headers
58 .get(CONTENT_TYPE)
59 .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
60
61 if content_type.is_none()
62 || content_type
63 .as_ref()
64 .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
65 {
66 serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
69 } else if content_type
70 .as_ref()
71 .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
72 {
73 let encoding = content_type
76 .as_ref()
77 .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
78 .unwrap_or(UTF_8);
79 let (decoded_body, _, had_errors) = encoding.decode(body);
80
81 if had_errors {
82 return Err(ContentDecoding(encoding.name()));
83 }
84
85 Ok(Value::String(decoded_body.into_owned().into()))
86 } else {
87 Ok(Value::Null)
89 }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum DeserializeError {
94 #[error("Could not parse JSON: {0}")]
95 SerdeJson(#[source] serde_json::Error),
96 #[error("Could not decode data with content encoding {0}")]
97 ContentDecoding(&'static str),
98}
99
100pub fn handle_raw_response(
101 data: &Value,
102 parts: &Parts,
103 key: ResponseKey,
104 connector: &Connector,
105 context: impl ContextReader,
106 client_headers: &HeaderMap<HeaderValue>,
107) -> MappedResponse {
108 let inputs = key
109 .inputs()
110 .clone()
111 .merger(&connector.response_variable_keys)
112 .config(connector.config.as_ref())
113 .context(context)
114 .status(parts.status.as_u16())
115 .request(&connector.response_headers, client_headers)
116 .response(&connector.response_headers, Some(parts))
117 .merge();
118 let warnings = Vec::new();
119 let (success, warnings) = is_success(
120 connector.error_settings.connect_is_success.as_ref(),
121 data,
122 parts,
123 &inputs,
124 warnings,
125 );
126 if success {
127 map_response(data, key, inputs, warnings)
128 } else {
129 map_error(connector, data, parts, key, inputs, warnings)
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134struct GraphQLDataMapper<'a> {
135 doc: &'a ExecutableDocument,
136 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
137}
138
139impl<'a> GraphQLDataMapper<'a> {
140 fn new(
141 doc: &'a ExecutableDocument,
142 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
143 ) -> Self {
144 Self { doc, subtypes_map }
145 }
146
147 fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
148 if let Some(data_typename) = data.get("__typename") {
149 match data_typename {
150 Value::String(typename) => {
151 self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
152 }
153 _ => false,
154 }
155 } else {
156 true
157 }
158 }
159
160 fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
161 if supertype == subtype {
162 true
163 } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
164 subtypes
165 .iter()
166 .any(|s| self.supertype_has_subtype(s, subtype))
167 } else {
168 false
169 }
170 }
171
172 fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
173 if selection_set.selections.is_empty() {
174 return data.clone();
175 }
176
177 match data {
178 Value::Object(map) => {
179 let mut new_map = Map::new();
180
181 for field in selection_set.selections.iter() {
182 match field {
183 Selection::Field(field) => {
184 if let Some(field_value) = map.get(field.name.as_str()) {
185 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
186 new_map.insert(
187 output_field_name.to_string(),
188 self.map_data(field_value, &field.selection_set),
189 );
190 } else if field.name == TYPENAME {
191 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
196 new_map.insert(
197 output_field_name.to_string(),
198 Value::String(selection_set.ty.to_string().into()),
199 );
200 }
201 }
202
203 Selection::FragmentSpread(spread) => {
204 if let Some(fragment) =
205 self.doc.fragments.get(spread.fragment_name.as_str())
206 && self.fragment_matches(data, fragment.type_condition())
207 {
208 let mapped = self.map_data(data, &fragment.selection_set);
209 if let Some(fragment_map) = mapped.as_object() {
210 new_map.extend(fragment_map.clone());
211 }
212 }
213 }
214
215 Selection::InlineFragment(fragment) => {
216 if let Some(type_condition) = &fragment.type_condition
217 && !self.fragment_matches(data, type_condition)
218 {
219 continue;
220 }
221 let mapped = self.map_data(data, &fragment.selection_set);
222 if let Some(fragment_map) = mapped.as_object() {
223 new_map.extend(fragment_map.clone());
224 }
225 }
226 }
227 }
228
229 Value::Object(new_map)
230 }
231
232 Value::Array(items) => Value::Array(
233 items
234 .iter()
235 .map(|item| self.map_data(item, selection_set))
236 .collect(),
237 ),
238
239 primitive => primitive.clone(),
240 }
241 }
242}
243
244fn is_success(
247 is_success_selection: Option<&JSONSelection>,
248 data: &Value,
249 parts: &Parts,
250 inputs: &IndexMap<String, Value>,
251 mut warnings: Vec<Problem>,
252) -> (bool, Vec<Problem>) {
253 let Some(is_success_selection) = is_success_selection else {
254 return (parts.status.is_success(), warnings);
255 };
256 let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
257 warnings.extend(aggregate_apply_to_errors(
258 apply_to_errors,
259 ProblemLocation::IsSuccess,
260 ));
261
262 let type_name = match res.as_ref() {
263 Some(Value::Bool(b)) => return (*b, warnings),
264 None => return (false, warnings),
265 Some(Value::Null) => "null",
266 Some(Value::Number(_)) => "number",
267 Some(Value::String(_)) => "string",
268 Some(Value::Array(_)) => "array",
269 Some(Value::Object(_)) => "object",
270 };
271 warnings.push(Problem {
272 message: format!("`isSuccess` must evaluate to a boolean, got {type_name}"),
273 path: String::new(),
274 count: 1,
275 location: ProblemLocation::IsSuccess,
276 });
277 (false, warnings)
278}
279
280pub fn handle_mapping_only_response(
284 key: ResponseKey,
285 connector: &Connector,
286 context: impl ContextReader,
287 client_headers: &HeaderMap<HeaderValue>,
288) -> MappedResponse {
289 let data = Value::Object(Map::new());
290 let inputs = key
291 .inputs()
292 .clone()
293 .merger(&connector.response_variable_keys)
294 .config(connector.config.as_ref())
295 .context(context)
296 .request(&connector.response_headers, client_headers)
297 .merge();
298 map_response(&data, key, inputs, Vec::new())
299}
300
301pub(super) fn map_response(
303 data: &Value,
304 key: ResponseKey,
305 inputs: IndexMap<String, Value>,
306 mut warnings: Vec<Problem>,
307) -> MappedResponse {
308 let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
309 warnings.extend(aggregate_apply_to_errors(
310 apply_to_errors,
311 ProblemLocation::Selection,
312 ));
313 MappedResponse::Data {
314 key,
315 data: res.unwrap_or_else(|| Value::Null),
316 problems: warnings,
317 }
318}
319
320pub(super) fn map_error(
322 connector: &Connector,
323 data: &Value,
324 parts: &Parts,
325 key: ResponseKey,
326 inputs: IndexMap<String, Value>,
327 mut warnings: Vec<Problem>,
328) -> MappedResponse {
329 let message = if let Some(message_selection) = &connector.error_settings.message {
331 let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
332 warnings.extend(aggregate_apply_to_errors(
333 apply_to_errors,
334 ProblemLocation::ErrorsMessage,
335 ));
336 res.as_ref()
337 .and_then(Value::as_str)
338 .unwrap_or_default()
339 .to_string()
340 } else {
341 "Request failed".to_string()
342 };
343
344 let mut error = RuntimeError::new(message, &key);
346 error.subgraph_name = Some(connector.id.subgraph_name.clone());
347 error.coordinate = Some(connector.id.coordinate());
348
349 error = error.extension(
351 "http",
352 Value::Object(Map::from_iter([(
353 "status".into(),
354 Value::Number(parts.status.as_u16().into()),
355 )])),
356 );
357
358 let mut extension_code = "CONNECTOR_FETCH".to_string();
365 if let Some(extensions_selection) = &connector.error_settings.source_extensions {
366 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
367 warnings.extend(aggregate_apply_to_errors(
368 apply_to_errors,
369 ProblemLocation::SourceErrorsExtensions,
370 ));
371
372 let extensions = res
374 .and_then(|e| match e {
375 Value::Object(map) => Some(map),
376 _ => None,
377 })
378 .unwrap_or_default();
379
380 if let Some(code) = extensions.get("code") {
381 extension_code = code.as_str().unwrap_or_default().to_string();
382 }
383
384 for (key, value) in extensions {
385 error = error.merge_extension(key, value);
386 }
387 }
388
389 if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
390 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
391 warnings.extend(aggregate_apply_to_errors(
392 apply_to_errors,
393 ProblemLocation::ConnectErrorsExtensions,
394 ));
395
396 let extensions = res
398 .and_then(|e| match e {
399 Value::Object(map) => Some(map),
400 _ => None,
401 })
402 .unwrap_or_default();
403
404 if let Some(code) = extensions.get("code") {
405 extension_code = code.as_str().unwrap_or_default().to_string();
406 }
407
408 for (key, value) in extensions {
409 error = error.merge_extension(key, value);
410 }
411 }
412
413 error = error.with_code(extension_code);
414
415 MappedResponse::Error {
416 error,
417 key,
418 problems: warnings,
419 }
420}
421#[derive(Debug)]
423pub enum MappedResponse {
424 Error {
427 error: RuntimeError,
428 key: ResponseKey,
429 problems: Vec<Problem>,
430 },
431 Data {
433 data: Value,
434 key: ResponseKey,
435 problems: Vec<Problem>,
436 },
437}
438
439impl MappedResponse {
440 pub fn add_to_data(
444 self,
445 data: &mut Map<ByteString, Value>,
446 errors: &mut Vec<RuntimeError>,
447 count: usize,
448 ) -> Result<(), HandleResponseError> {
449 match self {
450 Self::Error { error, key, .. } => {
451 match key {
452 ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
454 let entities = data
455 .entry(ENTITIES)
456 .or_insert(Value::Array(Vec::with_capacity(count)));
457 entities
458 .as_array_mut()
459 .ok_or_else(|| {
460 HandleResponseError::MergeError("_entities is not an array".into())
461 })?
462 .insert(index, Value::Null);
463 }
464 _ => {}
465 };
466 errors.push(error);
467 }
468 Self::Data {
469 data: value, key, ..
470 } => match key {
471 ResponseKey::RootField { ref name, .. } => {
472 data.insert(name.clone(), value);
473 }
474 ResponseKey::Entity { index, .. } => {
475 let entities = data
476 .entry(ENTITIES)
477 .or_insert(Value::Array(Vec::with_capacity(count)));
478 entities
479 .as_array_mut()
480 .ok_or_else(|| {
481 HandleResponseError::MergeError("_entities is not an array".into())
482 })?
483 .insert(index, value);
484 }
485 ResponseKey::EntityField {
486 index,
487 ref field_name,
488 ref typename,
489 ..
490 } => {
491 let entities = data
492 .entry(ENTITIES)
493 .or_insert(Value::Array(Vec::with_capacity(count)))
494 .as_array_mut()
495 .ok_or_else(|| {
496 HandleResponseError::MergeError("_entities is not an array".into())
497 })?;
498
499 match entities.get_mut(index) {
500 Some(Value::Object(entity)) => {
501 entity.insert(field_name.clone(), value);
502 }
503 _ => {
504 let mut entity = Map::new();
505 if let Some(typename) = typename {
506 entity.insert(TYPENAME, Value::String(typename.as_str().into()));
507 }
508 entity.insert(field_name.clone(), value);
509 entities.insert(index, Value::Object(entity));
510 }
511 };
512 }
513 ResponseKey::BatchEntity {
514 selection,
515 keys,
516 inputs,
517 } => {
518 let Value::Array(values) = value else {
519 return Err(HandleResponseError::MergeError(
520 "Response for a batch request does not map to an array".into(),
521 ));
522 };
523
524 let spec = selection.spec();
525 let key_selection = JSONSelection::parse_with_spec(
526 &keys.serialize().no_indent().to_string(),
527 spec,
528 )
529 .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
530
531 let key_values = inputs.batch.iter().map(|v| {
533 key_selection
534 .apply_to(&Value::Object(v.clone()))
535 .0
536 .unwrap_or(Value::Null)
537 });
538
539 let mut map = values
541 .into_iter()
542 .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
543 .collect::<HashMap<_, _>>();
544
545 let new_entities = key_values
547 .map(|key| map.remove(&key).unwrap_or(Value::Null))
548 .collect_vec();
549
550 let entities = data
552 .entry(ENTITIES)
553 .or_insert(Value::Array(Vec::with_capacity(count)));
554
555 entities
556 .as_array_mut()
557 .ok_or_else(|| {
558 HandleResponseError::MergeError("_entities is not an array".into())
559 })?
560 .extend(new_entities);
561 }
562 },
563 }
564
565 Ok(())
566 }
567
568 pub fn problems(&self) -> &[Problem] {
569 match self {
570 Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
571 }
572 }
573
574 pub fn apply_operation(
587 self, operation_option: Option<&ExecutableDocument>,
589 subtypes: &IndexMap<String, IndexSet<String>>,
590 ) -> Self {
591 match (self, operation_option) {
592 (
593 Self::Data {
594 data,
595 key,
596 problems,
597 },
598 Some(operation),
599 ) => {
600 let single_op = operation
601 .operations
602 .anonymous
603 .as_ref()
604 .or_else(|| operation.operations.named.values().next());
605
606 let data = if let Some(op) = single_op {
607 let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
608
609 match &key {
610 ResponseKey::RootField { name, .. } => {
611 for field in op.selection_set.selections.iter() {
612 if let Selection::Field(field) = field
613 && field.alias.as_deref().unwrap_or(field.name.as_str())
614 == name.as_str()
615 {
616 new_sub.ty = field.selection_set.ty.clone();
621 new_sub
622 .selections
623 .extend(field.selection_set.selections.iter().cloned());
624 }
625 }
626 }
627
628 ResponseKey::EntityField { field_name, .. } => {
629 let field_str = field_name.as_str();
630
631 for selection in op.selection_set.selections.iter() {
632 if let Selection::Field(field) = selection
633 && field.name.as_str() == "_entities"
634 {
635 for ent_sel in field.selection_set.selections.iter() {
636 match ent_sel {
641 Selection::InlineFragment(frag) => {
642 for field_sel in
643 frag.selection_set.selections.iter()
644 {
645 if let Selection::Field(field) = field_sel
646 && field.name.as_str() == field_str
647 {
648 new_sub.selections.extend(
649 field
650 .selection_set
651 .selections
652 .iter()
653 .cloned(),
654 );
655 }
656 }
657 }
658
659 Selection::Field(field) => {
660 if field.name.as_str() == field_str {
661 new_sub.selections.extend(
662 field
663 .selection_set
664 .selections
665 .iter()
666 .cloned(),
667 );
668 }
669 }
670
671 Selection::FragmentSpread(spread) => {
672 if let Some(fragment) = operation
673 .fragments
674 .get(spread.fragment_name.as_str())
675 {
676 for field_sel in
677 fragment.selection_set.selections.iter()
678 {
679 if let Selection::Field(field) = field_sel
680 && field.name.as_str() == field_str
681 {
682 new_sub.selections.extend(
683 field
684 .selection_set
685 .selections
686 .iter()
687 .cloned(),
688 );
689 }
690 }
691 }
692 }
693 }
694 }
695 }
696 }
697 }
698
699 ResponseKey::Entity { .. } => {
700 for selection in op.selection_set.selections.iter() {
701 if let Selection::Field(field) = selection
702 && field.name.as_str() == "_entities"
703 {
704 new_sub
705 .selections
706 .extend(field.selection_set.selections.iter().cloned());
707 }
708 }
709 }
710
711 ResponseKey::BatchEntity { keys, .. } => {
712 new_sub
713 .selections
714 .extend(keys.selection_set.selections.iter().cloned());
715
716 for selection in op.selection_set.selections.iter() {
717 if let Selection::Field(field) = selection
718 && field.name.as_str() == "_entities"
719 {
720 new_sub
721 .selections
722 .extend(field.selection_set.selections.iter().cloned());
723 }
724 }
725 }
726 };
727
728 GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
729 } else {
730 data
731 };
732
733 Self::Data {
734 data,
735 key,
736 problems,
737 }
738 }
739
740 (
742 MappedResponse::Error {
743 error,
744 key,
745 problems,
746 },
747 Some(_),
748 ) => MappedResponse::Error {
749 error,
750 key,
751 problems,
752 },
753
754 (mapped, None) => mapped,
756 }
757 }
758}
759
760#[cfg(test)]
761mod tests {
762 use std::sync::Arc;
763
764 use apollo_compiler::ExecutableDocument;
765 use apollo_compiler::Schema;
766 use http::HeaderMap;
767 use http::HeaderValue;
768 use http::StatusCode;
769 use http::response::Parts;
770 use serde_json_bytes::Value;
771 use serde_json_bytes::json;
772
773 use super::MappedResponse;
774 use super::deserialize_response;
775 use super::is_success;
776 use crate::connectors::JSONSelection;
777 use crate::connectors::runtime::inputs::RequestInputs;
778 use crate::connectors::runtime::key::ResponseKey;
779
780 fn make_parts(status: u16) -> Parts {
781 http::Response::builder()
782 .status(StatusCode::from_u16(status).unwrap())
783 .body(())
784 .unwrap()
785 .into_parts()
786 .0
787 }
788
789 #[test]
792 fn is_success_non_boolean_emits_warning() {
793 let selection = JSONSelection::parse("$.status").unwrap();
794 let data = json!({"status": "ok"});
795 let parts = make_parts(200);
796
797 let (success, problems) =
798 is_success(Some(&selection), &data, &parts, &Default::default(), vec![]);
799
800 assert!(!success, "non-boolean isSuccess should fail the request");
801 assert_eq!(problems.len(), 1, "expected one problem, got: {problems:?}");
802 assert!(
803 problems[0].message.contains("string"),
804 "problem message should mention the actual type, got: {:?}",
805 problems[0].message
806 );
807 }
808
809 fn headers_with(pairs: &[(&str, &str)]) -> HeaderMap {
810 let mut map = HeaderMap::new();
811 for (k, v) in pairs {
812 map.insert(
813 http::header::HeaderName::from_bytes(k.as_bytes()).unwrap(),
814 HeaderValue::from_str(v).unwrap(),
815 );
816 }
817 map
818 }
819
820 #[test]
821 fn empty_body_no_content_length_returns_null() {
822 let headers = HeaderMap::new();
824 let result = deserialize_response(b"", &headers).unwrap();
825 assert_eq!(result, Value::Null);
826 }
827
828 #[test]
829 fn empty_body_with_content_length_zero_returns_null() {
830 let headers = headers_with(&[("content-length", "0")]);
832 let result = deserialize_response(b"", &headers).unwrap();
833 assert_eq!(result, Value::Null);
834 }
835
836 #[test]
837 fn test_apply_operation_with_root_and_field_aliases() {
838 let schema = Schema::parse_and_validate(
839 r#"
840 type Query {
841 search_items(query: String): SearchResponse
842 }
843 type SearchResponse {
844 results: [Item!]!
845 metadata: Metadata!
846 }
847 type Item {
848 id: ID!
849 title: String!
850 viewUri: String!
851 }
852 type Metadata {
853 total: Int!
854 }
855 "#,
856 "schema.graphql",
857 )
858 .unwrap();
859
860 let query = r#"
861 {
862 items:search_items(query: "test") {
863 results {
864 id
865 title
866 link:viewUri
867 }
868 metadata {
869 total
870 }
871 }
872 }
873 "#;
874
875 let operation =
876 ExecutableDocument::parse_and_validate(&schema, query, "op.graphql").unwrap();
877
878 let mapped_data = json!({
879 "results": [
880 { "id": "1", "title": "First", "viewUri": "https://example.com/1" },
881 { "id": "2", "title": "Second", "viewUri": "https://example.com/2" }
882 ],
883 "metadata": { "total": 2 }
884 });
885
886 let response = MappedResponse::Data {
887 key: ResponseKey::RootField {
888 name: "items".to_string(),
889 inputs: RequestInputs::default(),
890 selection: Arc::new(JSONSelection::parse("$").unwrap()),
891 },
892 data: mapped_data,
893 problems: vec![],
894 };
895
896 let result = response.apply_operation(Some(&*operation), &Default::default());
897
898 let MappedResponse::Data { data, .. } = result else {
899 panic!("expected Data variant");
900 };
901
902 let items = data["results"].as_array().expect("results should be array");
903 assert_eq!(items.len(), 2);
904
905 assert_eq!(
907 items[0]["link"].as_str(),
908 Some("https://example.com/1"),
909 "field alias 'link' should resolve to viewUri value"
910 );
911 assert_eq!(
912 items[1]["link"].as_str(),
913 Some("https://example.com/2"),
914 "field alias 'link' should resolve to viewUri value"
915 );
916 assert!(
917 items[0].get("viewUri").is_none(),
918 "original field name should not appear in output when aliased"
919 );
920 }
921}