1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36 #[error("Merge error: {0}")]
37 MergeError(String),
38}
39
40pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42 if body.is_empty()
48 || headers
49 .get(CONTENT_LENGTH)
50 .and_then(|len| len.to_str().ok())
51 .and_then(|s| s.parse::<usize>().ok())
52 .is_some_and(|content_length| content_length == 0)
53 {
54 return Ok(Value::Null);
55 }
56
57 let content_type = headers
58 .get(CONTENT_TYPE)
59 .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
60
61 if content_type.is_none()
62 || content_type
63 .as_ref()
64 .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
65 {
66 serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
69 } else if content_type
70 .as_ref()
71 .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
72 {
73 let encoding = content_type
76 .as_ref()
77 .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
78 .unwrap_or(UTF_8);
79 let (decoded_body, _, had_errors) = encoding.decode(body);
80
81 if had_errors {
82 return Err(ContentDecoding(encoding.name()));
83 }
84
85 Ok(Value::String(decoded_body.into_owned().into()))
86 } else {
87 Ok(Value::Null)
89 }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum DeserializeError {
94 #[error("Could not parse JSON: {0}")]
95 SerdeJson(#[source] serde_json::Error),
96 #[error("Could not decode data with content encoding {0}")]
97 ContentDecoding(&'static str),
98}
99
100pub fn handle_raw_response(
101 data: &Value,
102 parts: &Parts,
103 key: ResponseKey,
104 connector: &Connector,
105 context: impl ContextReader,
106 client_headers: &HeaderMap<HeaderValue>,
107) -> MappedResponse {
108 let inputs = key
109 .inputs()
110 .clone()
111 .merger(&connector.response_variable_keys)
112 .config(connector.config.as_ref())
113 .context(context)
114 .status(parts.status.as_u16())
115 .request(&connector.response_headers, client_headers)
116 .response(&connector.response_headers, Some(parts))
117 .merge();
118 let warnings = Vec::new();
119 let (success, warnings) = is_success(connector, data, parts, &inputs, warnings);
120 if success {
121 map_response(data, key, inputs, warnings)
122 } else {
123 map_error(connector, data, parts, key, inputs, warnings)
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128struct GraphQLDataMapper<'a> {
129 doc: &'a ExecutableDocument,
130 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
131}
132
133impl<'a> GraphQLDataMapper<'a> {
134 fn new(
135 doc: &'a ExecutableDocument,
136 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
137 ) -> Self {
138 Self { doc, subtypes_map }
139 }
140
141 fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
142 if let Some(data_typename) = data.get("__typename") {
143 match data_typename {
144 Value::String(typename) => {
145 self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
146 }
147 _ => false,
148 }
149 } else {
150 true
151 }
152 }
153
154 fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
155 if supertype == subtype {
156 true
157 } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
158 subtypes
159 .iter()
160 .any(|s| self.supertype_has_subtype(s, subtype))
161 } else {
162 false
163 }
164 }
165
166 fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
167 if selection_set.selections.is_empty() {
168 return data.clone();
169 }
170
171 match data {
172 Value::Object(map) => {
173 let mut new_map = Map::new();
174
175 for field in selection_set.selections.iter() {
176 match field {
177 Selection::Field(field) => {
178 if let Some(field_value) = map.get(field.name.as_str()) {
179 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
180 new_map.insert(
181 output_field_name.to_string(),
182 self.map_data(field_value, &field.selection_set),
183 );
184 } else if field.name == TYPENAME {
185 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
190 new_map.insert(
191 output_field_name.to_string(),
192 Value::String(selection_set.ty.to_string().into()),
193 );
194 }
195 }
196
197 Selection::FragmentSpread(spread) => {
198 if let Some(fragment) =
199 self.doc.fragments.get(spread.fragment_name.as_str())
200 && self.fragment_matches(data, fragment.type_condition())
201 {
202 let mapped = self.map_data(data, &fragment.selection_set);
203 if let Some(fragment_map) = mapped.as_object() {
204 new_map.extend(fragment_map.clone());
205 }
206 }
207 }
208
209 Selection::InlineFragment(fragment) => {
210 if let Some(type_condition) = &fragment.type_condition
211 && !self.fragment_matches(data, type_condition)
212 {
213 continue;
214 }
215 let mapped = self.map_data(data, &fragment.selection_set);
216 if let Some(fragment_map) = mapped.as_object() {
217 new_map.extend(fragment_map.clone());
218 }
219 }
220 }
221 }
222
223 Value::Object(new_map)
224 }
225
226 Value::Array(items) => Value::Array(
227 items
228 .iter()
229 .map(|item| self.map_data(item, selection_set))
230 .collect(),
231 ),
232
233 primitive => primitive.clone(),
234 }
235 }
236}
237
238fn is_success(
241 connector: &Connector,
242 data: &Value,
243 parts: &Parts,
244 inputs: &IndexMap<String, Value>,
245 mut warnings: Vec<Problem>,
246) -> (bool, Vec<Problem>) {
247 let Some(is_success_selection) = &connector.error_settings.connect_is_success else {
248 return (parts.status.is_success(), warnings);
249 };
250 let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
251 warnings.extend(aggregate_apply_to_errors(
252 apply_to_errors,
253 ProblemLocation::IsSuccess,
254 ));
255
256 (
257 res.as_ref().and_then(Value::as_bool).unwrap_or_default(),
258 warnings,
259 )
260}
261
262pub fn handle_mapping_only_response(
266 key: ResponseKey,
267 connector: &Connector,
268 context: impl ContextReader,
269 client_headers: &HeaderMap<HeaderValue>,
270) -> MappedResponse {
271 let data = Value::Object(Map::new());
272 let inputs = key
273 .inputs()
274 .clone()
275 .merger(&connector.response_variable_keys)
276 .config(connector.config.as_ref())
277 .context(context)
278 .request(&connector.response_headers, client_headers)
279 .merge();
280 map_response(&data, key, inputs, Vec::new())
281}
282
283pub(super) fn map_response(
285 data: &Value,
286 key: ResponseKey,
287 inputs: IndexMap<String, Value>,
288 mut warnings: Vec<Problem>,
289) -> MappedResponse {
290 let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
291 warnings.extend(aggregate_apply_to_errors(
292 apply_to_errors,
293 ProblemLocation::Selection,
294 ));
295 MappedResponse::Data {
296 key,
297 data: res.unwrap_or_else(|| Value::Null),
298 problems: warnings,
299 }
300}
301
302pub(super) fn map_error(
304 connector: &Connector,
305 data: &Value,
306 parts: &Parts,
307 key: ResponseKey,
308 inputs: IndexMap<String, Value>,
309 mut warnings: Vec<Problem>,
310) -> MappedResponse {
311 let message = if let Some(message_selection) = &connector.error_settings.message {
313 let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
314 warnings.extend(aggregate_apply_to_errors(
315 apply_to_errors,
316 ProblemLocation::ErrorsMessage,
317 ));
318 res.as_ref()
319 .and_then(Value::as_str)
320 .unwrap_or_default()
321 .to_string()
322 } else {
323 "Request failed".to_string()
324 };
325
326 let mut error = RuntimeError::new(message, &key);
328 error.subgraph_name = Some(connector.id.subgraph_name.clone());
329 error.coordinate = Some(connector.id.coordinate());
330
331 error = error.extension(
333 "http",
334 Value::Object(Map::from_iter([(
335 "status".into(),
336 Value::Number(parts.status.as_u16().into()),
337 )])),
338 );
339
340 let mut extension_code = "CONNECTOR_FETCH".to_string();
345 if let Some(extensions_selection) = &connector.error_settings.source_extensions {
346 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
347 warnings.extend(aggregate_apply_to_errors(
348 apply_to_errors,
349 ProblemLocation::SourceErrorsExtensions,
350 ));
351
352 let extensions = res
354 .and_then(|e| match e {
355 Value::Object(map) => Some(map),
356 _ => None,
357 })
358 .unwrap_or_default();
359
360 if let Some(code) = extensions.get("code") {
361 extension_code = code.as_str().unwrap_or_default().to_string();
362 }
363
364 for (key, value) in extensions {
365 error = error.extension(key, value);
366 }
367 }
368
369 if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
370 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
371 warnings.extend(aggregate_apply_to_errors(
372 apply_to_errors,
373 ProblemLocation::ConnectErrorsExtensions,
374 ));
375
376 let extensions = res
378 .and_then(|e| match e {
379 Value::Object(map) => Some(map),
380 _ => None,
381 })
382 .unwrap_or_default();
383
384 if let Some(code) = extensions.get("code") {
385 extension_code = code.as_str().unwrap_or_default().to_string();
386 }
387
388 for (key, value) in extensions {
389 error = error.extension(key, value);
390 }
391 }
392
393 error = error.with_code(extension_code);
394
395 MappedResponse::Error {
396 error,
397 key,
398 problems: warnings,
399 }
400}
401#[derive(Debug)]
403pub enum MappedResponse {
404 Error {
407 error: RuntimeError,
408 key: ResponseKey,
409 problems: Vec<Problem>,
410 },
411 Data {
413 data: Value,
414 key: ResponseKey,
415 problems: Vec<Problem>,
416 },
417}
418
419impl MappedResponse {
420 pub fn add_to_data(
424 self,
425 data: &mut Map<ByteString, Value>,
426 errors: &mut Vec<RuntimeError>,
427 count: usize,
428 ) -> Result<(), HandleResponseError> {
429 match self {
430 Self::Error { error, key, .. } => {
431 match key {
432 ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
434 let entities = data
435 .entry(ENTITIES)
436 .or_insert(Value::Array(Vec::with_capacity(count)));
437 entities
438 .as_array_mut()
439 .ok_or_else(|| {
440 HandleResponseError::MergeError("_entities is not an array".into())
441 })?
442 .insert(index, Value::Null);
443 }
444 _ => {}
445 };
446 errors.push(error);
447 }
448 Self::Data {
449 data: value, key, ..
450 } => match key {
451 ResponseKey::RootField { ref name, .. } => {
452 data.insert(name.clone(), value);
453 }
454 ResponseKey::Entity { index, .. } => {
455 let entities = data
456 .entry(ENTITIES)
457 .or_insert(Value::Array(Vec::with_capacity(count)));
458 entities
459 .as_array_mut()
460 .ok_or_else(|| {
461 HandleResponseError::MergeError("_entities is not an array".into())
462 })?
463 .insert(index, value);
464 }
465 ResponseKey::EntityField {
466 index,
467 ref field_name,
468 ref typename,
469 ..
470 } => {
471 let entities = data
472 .entry(ENTITIES)
473 .or_insert(Value::Array(Vec::with_capacity(count)))
474 .as_array_mut()
475 .ok_or_else(|| {
476 HandleResponseError::MergeError("_entities is not an array".into())
477 })?;
478
479 match entities.get_mut(index) {
480 Some(Value::Object(entity)) => {
481 entity.insert(field_name.clone(), value);
482 }
483 _ => {
484 let mut entity = Map::new();
485 if let Some(typename) = typename {
486 entity.insert(TYPENAME, Value::String(typename.as_str().into()));
487 }
488 entity.insert(field_name.clone(), value);
489 entities.insert(index, Value::Object(entity));
490 }
491 };
492 }
493 ResponseKey::BatchEntity {
494 selection,
495 keys,
496 inputs,
497 } => {
498 let Value::Array(values) = value else {
499 return Err(HandleResponseError::MergeError(
500 "Response for a batch request does not map to an array".into(),
501 ));
502 };
503
504 let spec = selection.spec();
505 let key_selection = JSONSelection::parse_with_spec(
506 &keys.serialize().no_indent().to_string(),
507 spec,
508 )
509 .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
510
511 let key_values = inputs.batch.iter().map(|v| {
513 key_selection
514 .apply_to(&Value::Object(v.clone()))
515 .0
516 .unwrap_or(Value::Null)
517 });
518
519 let mut map = values
521 .into_iter()
522 .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
523 .collect::<HashMap<_, _>>();
524
525 let new_entities = key_values
527 .map(|key| map.remove(&key).unwrap_or(Value::Null))
528 .collect_vec();
529
530 let entities = data
532 .entry(ENTITIES)
533 .or_insert(Value::Array(Vec::with_capacity(count)));
534
535 entities
536 .as_array_mut()
537 .ok_or_else(|| {
538 HandleResponseError::MergeError("_entities is not an array".into())
539 })?
540 .extend(new_entities);
541 }
542 },
543 }
544
545 Ok(())
546 }
547
548 pub fn problems(&self) -> &[Problem] {
549 match self {
550 Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
551 }
552 }
553
554 pub fn apply_operation(
567 self, operation_option: Option<&ExecutableDocument>,
569 subtypes: &IndexMap<String, IndexSet<String>>,
570 ) -> Self {
571 match (self, operation_option) {
572 (
573 Self::Data {
574 data,
575 key,
576 problems,
577 },
578 Some(operation),
579 ) => {
580 let single_op = operation
581 .operations
582 .anonymous
583 .as_ref()
584 .or_else(|| operation.operations.named.values().next());
585
586 let data = if let Some(op) = single_op {
587 let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
588
589 match &key {
590 ResponseKey::RootField { name, .. } => {
591 for field in op.selection_set.selections.iter() {
592 if let Selection::Field(field) = field
593 && field.alias.as_deref().unwrap_or(field.name.as_str())
594 == name.as_str()
595 {
596 new_sub.ty = field.selection_set.ty.clone();
601 new_sub
602 .selections
603 .extend(field.selection_set.selections.iter().cloned());
604 }
605 }
606 }
607
608 ResponseKey::EntityField { field_name, .. } => {
609 let field_str = field_name.as_str();
610
611 for selection in op.selection_set.selections.iter() {
612 if let Selection::Field(field) = selection
613 && field.name.as_str() == "_entities"
614 {
615 for ent_sel in field.selection_set.selections.iter() {
616 match ent_sel {
621 Selection::InlineFragment(frag) => {
622 for field_sel in
623 frag.selection_set.selections.iter()
624 {
625 if let Selection::Field(field) = field_sel
626 && field.name.as_str() == field_str
627 {
628 new_sub.selections.extend(
629 field
630 .selection_set
631 .selections
632 .iter()
633 .cloned(),
634 );
635 }
636 }
637 }
638
639 Selection::Field(field) => {
640 if field.name.as_str() == field_str {
641 new_sub.selections.extend(
642 field
643 .selection_set
644 .selections
645 .iter()
646 .cloned(),
647 );
648 }
649 }
650
651 Selection::FragmentSpread(spread) => {
652 if let Some(fragment) = operation
653 .fragments
654 .get(spread.fragment_name.as_str())
655 {
656 for field_sel in
657 fragment.selection_set.selections.iter()
658 {
659 if let Selection::Field(field) = field_sel
660 && field.name.as_str() == field_str
661 {
662 new_sub.selections.extend(
663 field
664 .selection_set
665 .selections
666 .iter()
667 .cloned(),
668 );
669 }
670 }
671 }
672 }
673 }
674 }
675 }
676 }
677 }
678
679 ResponseKey::Entity { .. } => {
680 for selection in op.selection_set.selections.iter() {
681 if let Selection::Field(field) = selection
682 && field.name.as_str() == "_entities"
683 {
684 new_sub
685 .selections
686 .extend(field.selection_set.selections.iter().cloned());
687 }
688 }
689 }
690
691 ResponseKey::BatchEntity { keys, .. } => {
692 new_sub
693 .selections
694 .extend(keys.selection_set.selections.iter().cloned());
695
696 for selection in op.selection_set.selections.iter() {
697 if let Selection::Field(field) = selection
698 && field.name.as_str() == "_entities"
699 {
700 new_sub
701 .selections
702 .extend(field.selection_set.selections.iter().cloned());
703 }
704 }
705 }
706 };
707
708 GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
709 } else {
710 data
711 };
712
713 Self::Data {
714 data,
715 key,
716 problems,
717 }
718 }
719
720 (
722 MappedResponse::Error {
723 error,
724 key,
725 problems,
726 },
727 Some(_),
728 ) => MappedResponse::Error {
729 error,
730 key,
731 problems,
732 },
733
734 (mapped, None) => mapped,
736 }
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use std::sync::Arc;
743
744 use apollo_compiler::ExecutableDocument;
745 use apollo_compiler::Schema;
746 use http::HeaderMap;
747 use http::HeaderValue;
748 use serde_json_bytes::Value;
749 use serde_json_bytes::json;
750
751 use super::MappedResponse;
752 use super::deserialize_response;
753 use crate::connectors::JSONSelection;
754 use crate::connectors::runtime::inputs::RequestInputs;
755 use crate::connectors::runtime::key::ResponseKey;
756
757 fn headers_with(pairs: &[(&str, &str)]) -> HeaderMap {
758 let mut map = HeaderMap::new();
759 for (k, v) in pairs {
760 map.insert(
761 http::header::HeaderName::from_bytes(k.as_bytes()).unwrap(),
762 HeaderValue::from_str(v).unwrap(),
763 );
764 }
765 map
766 }
767
768 #[test]
769 fn empty_body_no_content_length_returns_null() {
770 let headers = HeaderMap::new();
772 let result = deserialize_response(b"", &headers).unwrap();
773 assert_eq!(result, Value::Null);
774 }
775
776 #[test]
777 fn empty_body_with_content_length_zero_returns_null() {
778 let headers = headers_with(&[("content-length", "0")]);
780 let result = deserialize_response(b"", &headers).unwrap();
781 assert_eq!(result, Value::Null);
782 }
783
784 #[test]
785 fn test_apply_operation_with_root_and_field_aliases() {
786 let schema = Schema::parse_and_validate(
787 r#"
788 type Query {
789 search_items(query: String): SearchResponse
790 }
791 type SearchResponse {
792 results: [Item!]!
793 metadata: Metadata!
794 }
795 type Item {
796 id: ID!
797 title: String!
798 viewUri: String!
799 }
800 type Metadata {
801 total: Int!
802 }
803 "#,
804 "schema.graphql",
805 )
806 .unwrap();
807
808 let query = r#"
809 {
810 items:search_items(query: "test") {
811 results {
812 id
813 title
814 link:viewUri
815 }
816 metadata {
817 total
818 }
819 }
820 }
821 "#;
822
823 let operation =
824 ExecutableDocument::parse_and_validate(&schema, query, "op.graphql").unwrap();
825
826 let mapped_data = json!({
827 "results": [
828 { "id": "1", "title": "First", "viewUri": "https://example.com/1" },
829 { "id": "2", "title": "Second", "viewUri": "https://example.com/2" }
830 ],
831 "metadata": { "total": 2 }
832 });
833
834 let response = MappedResponse::Data {
835 key: ResponseKey::RootField {
836 name: "items".to_string(),
837 inputs: RequestInputs::default(),
838 selection: Arc::new(JSONSelection::parse("$").unwrap()),
839 },
840 data: mapped_data,
841 problems: vec![],
842 };
843
844 let result = response.apply_operation(Some(&*operation), &Default::default());
845
846 let MappedResponse::Data { data, .. } = result else {
847 panic!("expected Data variant");
848 };
849
850 let items = data["results"].as_array().expect("results should be array");
851 assert_eq!(items.len(), 2);
852
853 assert_eq!(
855 items[0]["link"].as_str(),
856 Some("https://example.com/1"),
857 "field alias 'link' should resolve to viewUri value"
858 );
859 assert_eq!(
860 items[1]["link"].as_str(),
861 Some("https://example.com/2"),
862 "field alias 'link' should resolve to viewUri value"
863 );
864 assert!(
865 items[0].get("viewUri").is_none(),
866 "original field name should not appear in output when aliased"
867 );
868 }
869}